[SCM] Samba Shared Repository - branch master updated
Andrew Bartlett
abartlet at samba.org
Mon Apr 12 00:39:01 UTC 2021
The branch, master has been updated
via 768d48fca9f tests python krb5: MS-KILE client principal look-up
from 534de9b2827 VFS: Remove SMB_VFS_CHMOD, no longer used
https://git.samba.org/?p=samba.git;a=shortlog;h=master
- Log -----------------------------------------------------------------
commit 768d48fca9f8c7527c0d12e7acc8942b5fd36ac2
Author: Gary Lockyer <gary at catalyst.net.nz>
Date: Wed Feb 17 12:15:50 2021 +1300
tests python krb5: MS-KILE client principal look-up
Tests of [MS-KILE]: Kerberos Protocol Extensions
section 3.3.5.6.1 Client Principal Lookup
Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
Reviewed-by: Isaac Boukris <iboukris at samba.org>
Autobuild-User(master): Andrew Bartlett <abartlet at samba.org>
Autobuild-Date(master): Mon Apr 12 00:38:26 UTC 2021 on sn-devel-184
-----------------------------------------------------------------------
Summary of changes:
python/samba/tests/krb5/kdc_base_test.py | 29 +-
.../krb5/ms_kile_client_principal_lookup_tests.py | 814 +++++++++++++++++++++
python/samba/tests/usage.py | 1 +
selftest/knownfail_heimdal_kdc | 12 +
selftest/knownfail_mit_kdc | 16 +
source4/selftest/tests.py | 3 +
6 files changed, 874 insertions(+), 1 deletion(-)
create mode 100755 python/samba/tests/krb5/ms_kile_client_principal_lookup_tests.py
Changeset truncated at 500 lines:
diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py
index bef5458c881..1c7f05dda6d 100644
--- a/python/samba/tests/krb5/kdc_base_test.py
+++ b/python/samba/tests/krb5/kdc_base_test.py
@@ -22,6 +22,7 @@ import os
sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
from collections import namedtuple
+import ldb
from ldb import SCOPE_BASE
from samba import generate_random_password
from samba.auth import system_session
@@ -103,7 +104,7 @@ class KDCBaseTest(RawKerberosTest):
for dn in self.accounts:
delete_force(self.ldb, dn)
- def create_account(self, name, machine_account=False, spn=None):
+ def create_account(self, name, machine_account=False, spn=None, upn=None):
'''Create an account for testing.
The dn of the created account is added to self.accounts,
which is used by tearDown to clean up the created accounts.
@@ -133,6 +134,8 @@ class KDCBaseTest(RawKerberosTest):
"unicodePwd": utf16pw}
if spn is not None:
details["servicePrincipalName"] = spn
+ if upn is not None:
+ details["userPrincipalName"] = upn
self.ldb.add(details)
creds = Credentials()
@@ -418,3 +421,27 @@ class KDCBaseTest(RawKerberosTest):
self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
sid = self.ldb.schema_format_value("objectSID", res[0]["objectSID"][0])
return sid.decode('utf8')
+
+ def add_attribute(self, dn_str, name, value):
+ if isinstance(value, list):
+ values = value
+ else:
+ values = [value]
+ flag = ldb.FLAG_MOD_ADD
+
+ dn = ldb.Dn(self.ldb, dn_str)
+ msg = ldb.Message(dn)
+ msg[name] = ldb.MessageElement(values, flag, name)
+ self.ldb.modify(msg)
+
+ def modify_attribute(self, dn_str, name, value):
+ if isinstance(value, list):
+ values = value
+ else:
+ values = [value]
+ flag = ldb.FLAG_MOD_REPLACE
+
+ dn = ldb.Dn(self.ldb, dn_str)
+ msg = ldb.Message(dn)
+ msg[name] = ldb.MessageElement(values, flag, name)
+ self.ldb.modify(msg)
diff --git a/python/samba/tests/krb5/ms_kile_client_principal_lookup_tests.py b/python/samba/tests/krb5/ms_kile_client_principal_lookup_tests.py
new file mode 100755
index 00000000000..356a25f8e18
--- /dev/null
+++ b/python/samba/tests/krb5/ms_kile_client_principal_lookup_tests.py
@@ -0,0 +1,814 @@
+#!/usr/bin/env python3
+# Unix SMB/CIFS implementation.
+# Copyright (C) Stefan Metzmacher 2020
+# Copyright (C) 2020 Catalyst.Net Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import sys
+import os
+
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
+from samba.dsdb import UF_NORMAL_ACCOUNT, UF_DONT_REQUIRE_PREAUTH
+from samba.tests.krb5.kdc_base_test import KDCBaseTest
+from samba.tests.krb5.rfc4120_constants import (
+ AES256_CTS_HMAC_SHA1_96,
+ ARCFOUR_HMAC_MD5,
+ NT_ENTERPRISE_PRINCIPAL,
+ NT_PRINCIPAL,
+ NT_SRV_INST,
+ KDC_ERR_C_PRINCIPAL_UNKNOWN,
+)
+
+global_asn1_print = False
+global_hexdump = False
+
+
+class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
+ ''' Tests for MS-KILE client principal look-up
+ See [MS-KILE]: Kerberos Protocol Extensions
+ secion 3.3.5.6.1 Client Principal Lookup
+ '''
+
+ def setUp(self):
+ super().setUp()
+ self.do_asn1_print = global_asn1_print
+ self.do_hexdump = global_hexdump
+
+ def check_pac(self, auth_data, dn, uc, name, upn=None):
+
+ pac_data = self.get_pac_data(auth_data)
+ sid = self.get_objectSid(dn)
+ if upn is None:
+ upn = "%s@%s" % (name, uc.get_realm().lower())
+ if name.endswith('$'):
+ name = name[:-1]
+
+ self.assertEqual(
+ uc.get_username(),
+ str(pac_data.account_name),
+ "pac_data = {%s}" % str(pac_data))
+ self.assertEqual(
+ name,
+ pac_data.logon_name,
+ "pac_data = {%s}" % str(pac_data))
+ self.assertEqual(
+ uc.get_realm(),
+ pac_data.domain_name,
+ "pac_data = {%s}" % str(pac_data))
+ self.assertEqual(
+ upn,
+ pac_data.upn,
+ "pac_data = {%s}" % str(pac_data))
+ self.assertEqual(
+ sid,
+ pac_data.account_sid,
+ "pac_data = {%s}" % str(pac_data))
+
+ def test_nt_principal_step_1(self):
+ ''' Step 1
+ For an NT_PRINCIPAL cname with no realm or the realm matches the
+ DC's domain
+ search for an account with the
+ sAMAccountName matching the cname.
+ '''
+
+ # Create user and machine accounts for the test.
+ #
+ user_name = "mskileusr"
+ (uc, dn) = self.create_account(user_name)
+ realm = uc.get_realm().lower()
+
+ mach_name = "mskilemac"
+ (mc, _) = self.create_account(mach_name, machine_account=True)
+
+ # Do the initial AS-REQ, should get a pre-authentication required
+ # response
+ etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=[user_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_SRV_INST, names=["krbtgt", realm])
+
+ rep = self.as_req(cname, sname, realm, etype)
+ self.check_pre_authenication(rep)
+
+ # Do the next AS-REQ
+ padata = self.get_pa_data(uc, rep)
+ key = self.get_as_rep_key(uc, rep)
+ rep = self.as_req(cname, sname, realm, etype, padata=padata)
+ self.check_as_reply(rep)
+
+ # Request a ticket to the host service on the machine account
+ ticket = rep['ticket']
+ enc_part2 = self.get_as_rep_enc_data(key, rep)
+ key = self.EncryptionKey_import(enc_part2['key'])
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[user_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[mc.get_username()])
+
+ (rep, enc_part) = self.tgs_req(
+ cname, sname, uc.get_realm(), ticket, key, etype)
+ self.check_tgs_reply(rep)
+
+ # Check the contents of the pac, and the ticket
+ ticket = rep['ticket']
+ enc_part = self.decode_service_ticket(mc, ticket)
+ self.check_pac(enc_part['authorization-data'], dn, uc, user_name)
+ # check the crealm and cname
+ cname = enc_part['cname']
+ self.assertEqual(NT_PRINCIPAL, cname['name-type'])
+ self.assertEqual(user_name.encode('UTF8'), cname['name-string'][0])
+ self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
+
+ def test_nt_principal_step_2(self):
+ ''' Step 2
+ If not found
+ search for sAMAccountName equal to the cname + "$"
+
+ '''
+
+ # Create a machine account for the test.
+ #
+ user_name = "mskilemac"
+ (mc, dn) = self.create_account(user_name, machine_account=True)
+ realm = mc.get_realm().lower()
+
+ mach_name = "mskilemac"
+ (mc, _) = self.create_account(mach_name, machine_account=True)
+
+ # Do the initial AS-REQ, should get a pre-authentication required
+ # response
+ etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=[user_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_SRV_INST, names=["krbtgt", realm])
+
+ rep = self.as_req(cname, sname, realm, etype)
+ self.check_pre_authenication(rep)
+
+ # Do the next AS-REQ
+ padata = self.get_pa_data(mc, rep)
+ key = self.get_as_rep_key(mc, rep)
+ rep = self.as_req(cname, sname, realm, etype, padata=padata)
+ self.check_as_reply(rep)
+
+ # Request a ticket to the host service on the machine account
+ ticket = rep['ticket']
+ enc_part2 = self.get_as_rep_enc_data(key, rep)
+ key = self.EncryptionKey_import(enc_part2['key'])
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[user_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[mc.get_username()])
+
+ (rep, enc_part) = self.tgs_req(
+ cname, sname, mc.get_realm(), ticket, key, etype)
+ self.check_tgs_reply(rep)
+
+ # Check the contents of the pac, and the ticket
+ ticket = rep['ticket']
+ enc_part = self.decode_service_ticket(mc, ticket)
+ self.check_pac(enc_part['authorization-data'], dn, mc, mach_name + '$')
+ # check the crealm and cname
+ cname = enc_part['cname']
+ self.assertEqual(NT_PRINCIPAL, cname['name-type'])
+ self.assertEqual(user_name.encode('UTF8'), cname['name-string'][0])
+ self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
+
+ def test_nt_principal_step_3(self):
+ ''' Step 3
+
+ If not found
+ search for a matching UPN name where the UPN is set to
+ cname at realm or cname at DC's domain name
+
+ '''
+ # Create a user account for the test.
+ #
+ user_name = "mskileusr"
+ upn_name = "mskileupn"
+ upn = upn_name + "@" + self.credentials.get_realm().lower()
+ (uc, dn) = self.create_account(user_name, upn=upn)
+ realm = uc.get_realm().lower()
+
+ mach_name = "mskilemac"
+ (mc, _) = self.create_account(mach_name, machine_account=True)
+
+ # Do the initial AS-REQ, should get a pre-authentication required
+ # response
+ etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=[upn_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_SRV_INST, names=["krbtgt", realm])
+
+ rep = self.as_req(cname, sname, realm, etype)
+ self.check_pre_authenication(rep)
+
+ # Do the next AS-REQ
+ padata = self.get_pa_data(uc, rep)
+ key = self.get_as_rep_key(uc, rep)
+ rep = self.as_req(cname, sname, realm, etype, padata=padata)
+ self.check_as_reply(rep)
+
+ # Request a ticket to the host service on the machine account
+ ticket = rep['ticket']
+ enc_part2 = self.get_as_rep_enc_data(key, rep)
+ key = self.EncryptionKey_import(enc_part2['key'])
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[upn_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[mc.get_username()])
+
+ (rep, enc_part) = self.tgs_req(
+ cname, sname, uc.get_realm(), ticket, key, etype)
+ self.check_tgs_reply(rep)
+
+ # Check the contents of the service ticket
+ ticket = rep['ticket']
+ enc_part = self.decode_service_ticket(mc, ticket)
+ self.check_pac(enc_part['authorization-data'], dn, uc, upn_name)
+ # check the crealm and cname
+ cname = enc_part['cname']
+ self.assertEqual(NT_PRINCIPAL, cname['name-type'])
+ self.assertEqual(upn_name.encode('UTF8'), cname['name-string'][0])
+ self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
+
+ def test_nt_principal_step_4_a(self):
+ ''' Step 4, no pre-authentication
+ If not found and no pre-authentication
+ search for a matching altSecurityIdentity
+ '''
+ # Create a user account for the test.
+ # with an altSecurityIdentity, and with UF_DONT_REQUIRE_PREAUTH
+ # set.
+ #
+ # note that in this case IDL_DRSCrackNames is called with
+ # pmsgIn.formatOffered set to
+ # DS_USER_PRINCIPAL_NAME_AND_ALTSECID
+ #
+ # setting UF_DONT_REQUIRE_PREAUTH seems to be the only way
+ # to trigger the no pre-auth step
+
+ user_name = "mskileusr"
+ alt_name = "mskilealtsec"
+ (uc, dn) = self.create_account(user_name)
+ realm = uc.get_realm().lower()
+ alt_sec = "Kerberos:%s@%s" % (alt_name, realm)
+ self.add_attribute(dn, "altSecurityIdentities", alt_sec)
+ self.modify_attribute(
+ dn,
+ "userAccountControl",
+ str(UF_NORMAL_ACCOUNT | UF_DONT_REQUIRE_PREAUTH))
+
+ mach_name = "mskilemac"
+ (mc, _) = self.create_account(mach_name, machine_account=True)
+
+ # Do the initial AS-REQ, as we've set UF_DONT_REQUIRE_PREAUTH
+ # we should get a valid AS-RESP
+ # response
+ etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=[alt_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_SRV_INST, names=["krbtgt", realm])
+
+ rep = self.as_req(cname, sname, realm, etype)
+ self.check_as_reply(rep)
+ salt = "%s%s" % (realm.upper(), user_name)
+ key = self.PasswordKey_create(
+ rep['enc-part']['etype'],
+ uc.get_password(),
+ salt.encode('UTF8'),
+ rep['enc-part']['kvno'])
+
+ # Request a ticket to the host service on the machine account
+ ticket = rep['ticket']
+ enc_part2 = self.get_as_rep_enc_data(key, rep)
+ key = self.EncryptionKey_import(enc_part2['key'])
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=[alt_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[mc.get_username()])
+
+ (rep, enc_part) = self.tgs_req(
+ cname, sname, uc.get_realm(), ticket, key, etype)
+ self.check_tgs_reply(rep)
+
+ # Check the contents of the service ticket
+ ticket = rep['ticket']
+ enc_part = self.decode_service_ticket(mc, ticket)
+ #
+ # We get an empty authorization-data element in the ticket.
+ # i.e. no PAC
+ self.assertEqual([], enc_part['authorization-data'])
+ # check the crealm and cname
+ cname = enc_part['cname']
+ self.assertEqual(NT_PRINCIPAL, cname['name-type'])
+ self.assertEqual(alt_name.encode('UTF8'), cname['name-string'][0])
+ self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
+
+ def test_nt_principal_step_4_b(self):
+ ''' Step 4, pre-authentication
+ If not found and pre-authentication
+ search for a matching user principal name
+ '''
+
+ # Create user and machine accounts for the test.
+ #
+ user_name = "mskileusr"
+ alt_name = "mskilealtsec"
+ (uc, dn) = self.create_account(user_name)
+ realm = uc.get_realm().lower()
+ alt_sec = "Kerberos:%s@%s" % (alt_name, realm)
+ self.add_attribute(dn, "altSecurityIdentities", alt_sec)
+
+ mach_name = "mskilemac"
+ (mc, _) = self.create_account(mach_name, machine_account=True)
+
+ # Do the initial AS-REQ, should get a pre-authentication required
+ # response
+ etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=[alt_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_SRV_INST, names=["krbtgt", realm])
+
+ rep = self.as_req(cname, sname, realm, etype)
+ self.check_pre_authenication(rep)
+
+ # Do the next AS-REQ
+ padata = self.get_pa_data(uc, rep)
+ key = self.get_as_rep_key(uc, rep)
+ # Note: although we used the alt security id for the pre-auth
+ # we need to use the username for the auth
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=[user_name])
+ rep = self.as_req(cname, sname, realm, etype, padata=padata)
+ self.check_as_reply(rep)
+
+ # Request a ticket to the host service on the machine account
+ ticket = rep['ticket']
+ enc_part2 = self.get_as_rep_enc_data(key, rep)
+ key = self.EncryptionKey_import(enc_part2['key'])
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[user_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[mc.get_username()])
+
+ (rep, enc_part) = self.tgs_req(
+ cname, sname, uc.get_realm(), ticket, key, etype)
+ self.check_tgs_reply(rep)
+
+ # Check the contents of the pac, and the ticket
+ ticket = rep['ticket']
+ enc_part = self.decode_service_ticket(mc, ticket)
+ self.check_pac(enc_part['authorization-data'], dn, uc, user_name)
+ # check the crealm and cname
+ cname = enc_part['cname']
+ self.assertEqual(NT_PRINCIPAL, cname['name-type'])
+ self.assertEqual(user_name.encode('UTF8'), cname['name-string'][0])
+ self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
+
+ def test_nt_principal_step_4_c(self):
+ ''' Step 4, pre-authentication
+ If not found and pre-authentication
+ search for a matching user principal name
+
+ This test uses the altsecid, so the AS-REQ should fail.
+ '''
+
+ # Create user and machine accounts for the test.
+ #
+ user_name = "mskileusr"
+ alt_name = "mskilealtsec"
+ (uc, dn) = self.create_account(user_name)
+ realm = uc.get_realm().lower()
+ alt_sec = "Kerberos:%s@%s" % (alt_name, realm)
+ self.add_attribute(dn, "altSecurityIdentities", alt_sec)
+
+ mach_name = "mskilemac"
+ (mc, _) = self.create_account(mach_name, machine_account=True)
+
+ # Do the initial AS-REQ, should get a pre-authentication required
+ # response
+ etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=[alt_name])
+ sname = self.PrincipalName_create(
+ name_type=NT_SRV_INST, names=["krbtgt", realm])
+
+ rep = self.as_req(cname, sname, realm, etype)
+ self.check_pre_authenication(rep)
+
+ # Do the next AS-REQ
+ padata = self.get_pa_data(uc, rep)
+ # Use the alternate security identifier
+ # this should fail
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=[alt_sec])
+ rep = self.as_req(cname, sname, realm, etype, padata=padata)
--
Samba Shared Repository
More information about the samba-cvs
mailing list