[SCM] Samba Shared Repository - branch master updated

Nadezhda Ivanova nivanova at samba.org
Wed Dec 15 13:33:01 MST 2010


The branch, master has been updated
       via  6bb89aa s4-tests: Added a speedtest for LDAP search operations with different accounts.
       via  aab37c3 s4-tests: Added tests for LDAP add/delete/modify using anonymous login.
       via  a53f09b s4-dsdb: Fixed incorrect LDAP return code when anonymous login is used.
      from  b3630b4 Fix bug 7866 - "net" in v3-6-test broken.

http://gitweb.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit 6bb89aaa0d38d59ce4f0d9362822ba1c525eb203
Author: Nadezhda Ivanova <nivanova at samba.org>
Date:   Wed Dec 15 21:29:53 2010 +0200

    s4-tests: Added a speedtest for LDAP search operations with different accounts.
    
    Autobuild-User: Nadezhda Ivanova <nivanova at samba.org>
    Autobuild-Date: Wed Dec 15 21:32:09 CET 2010 on sn-devel-104

commit aab37c314671f9ad712ab03b1b1c2e6688df772d
Author: Nadezhda Ivanova <nivanova at samba.org>
Date:   Wed Dec 15 21:28:59 2010 +0200

    s4-tests: Added tests for LDAP add/delete/modify using anonymous login.

commit a53f09b9312fc08d4cdb2d94ec9119ee29b1bf84
Author: Nadezhda Ivanova <nivanova at samba.org>
Date:   Wed Dec 15 21:28:12 2010 +0200

    s4-dsdb: Fixed incorrect LDAP return code when anonymous login is used.

-----------------------------------------------------------------------

Summary of changes:
 source4/dsdb/samdb/ldb_modules/rootdse.c |    2 +-
 source4/dsdb/tests/python/acl.py         |   58 +++++++++++++++++++---
 source4/scripting/devel/speedtest.py     |   78 +++++++++++++++++++++++++----
 3 files changed, 118 insertions(+), 20 deletions(-)


Changeset truncated at 500 lines:

diff --git a/source4/dsdb/samdb/ldb_modules/rootdse.c b/source4/dsdb/samdb/ldb_modules/rootdse.c
index e7ea765..2571bc3 100644
--- a/source4/dsdb/samdb/ldb_modules/rootdse.c
+++ b/source4/dsdb/samdb/ldb_modules/rootdse.c
@@ -641,7 +641,7 @@ static int rootdse_filter_operations(struct ldb_module *module, struct ldb_reque
 		}
 	}
 	ldb_set_errstring(ldb_module_get_ctx(module), "Operation unavailable without authentication");
-	return LDB_ERR_STRONG_AUTH_REQUIRED;
+	return LDB_ERR_OPERATIONS_ERROR;
 }
 
 static int rootdse_search(struct ldb_module *module, struct ldb_request *req)
diff --git a/source4/dsdb/tests/python/acl.py b/source4/dsdb/tests/python/acl.py
index 85018b0..12f653b 100755
--- a/source4/dsdb/tests/python/acl.py
+++ b/source4/dsdb/tests/python/acl.py
@@ -6,7 +6,6 @@ import optparse
 import sys
 import base64
 import re
-
 sys.path.append("bin/python")
 import samba
 samba.ensure_external_module("testtools", "testtools")
@@ -20,7 +19,7 @@ from ldb import (
 from ldb import ERR_CONSTRAINT_VIOLATION
 from ldb import ERR_OPERATIONS_ERROR
 from ldb import Message, MessageElement, Dn
-from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE
+from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE
 from samba.ndr import ndr_pack, ndr_unpack
 from samba.dcerpc import security
 
@@ -67,6 +66,13 @@ class AclTests(samba.tests.TestCase):
         self.user_pass = "samba123@"
         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
         self.sd_utils = sd_utils.SDUtils(ldb)
+        #used for anonymous login
+        self.creds_tmp = Credentials()
+        self.creds_tmp.set_username("")
+        self.creds_tmp.set_password("")
+        self.creds_tmp.set_domain(creds.get_domain())
+        self.creds_tmp.set_realm(creds.get_realm())
+        self.creds_tmp.set_workstation(creds.get_workstation())
         print "baseDN: %s" % self.base_dn
 
     def get_user_dn(self, name):
@@ -134,6 +140,7 @@ class AclAddTests(AclTests):
         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
+        delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
 
     # Make sure top OU is deleted (and so everything under it)
     def assert_top_ou_deleted(self):
@@ -229,6 +236,16 @@ class AclAddTests(AclTests):
                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
         self.assertTrue(len(res) > 0)
 
+    def test_add_anonymous(self):
+        """Test add operation with anonymous user"""
+        anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+        try:
+            anonymous.newuser("test_add_anonymous", self.user_pass)
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+        else:
+            self.fail()
+
 #tests on ldap modify operations
 class AclModifyTests(AclTests):
 
@@ -259,6 +276,7 @@ class AclModifyTests(AclTests):
         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
+        delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
 
     def test_modify_u1(self):
         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
@@ -554,6 +572,23 @@ Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
 
+    def test_modify_anonymous(self):
+        """Test add operation with anonymous user"""
+        anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+        self.ldb_admin.newuser("test_anonymous", "samba123@")
+        m = Message()
+        m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
+
+        m["description"] = MessageElement("sambauser2",
+                                          FLAG_MOD_ADD,
+                                          "description")
+        try:
+            anonymous.modify(m)
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+        else:
+            self.fail()
+
 #enable these when we have search implemented
 class AclSearchTests(AclTests):
 
@@ -563,12 +598,6 @@ class AclSearchTests(AclTests):
         self.u2 = "search_u2"
         self.u3 = "search_u3"
         self.group1 = "group1"
-        self.creds_tmp = Credentials()
-        self.creds_tmp.set_username("")
-        self.creds_tmp.set_password("")
-        self.creds_tmp.set_domain(creds.get_domain())
-        self.creds_tmp.set_realm(creds.get_realm())
-        self.creds_tmp.set_workstation(creds.get_workstation())
         self.ldb_admin.newuser(self.u1, self.user_pass)
         self.ldb_admin.newuser(self.u2, self.user_pass)
         self.ldb_admin.newuser(self.u3, self.user_pass)
@@ -926,6 +955,7 @@ class AclDeleteTests(AclTests):
         super(AclDeleteTests, self).tearDown()
         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
+        delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
 
     def test_delete_u1(self):
         """User is prohibited by default to delete another User object"""
@@ -965,6 +995,18 @@ class AclDeleteTests(AclTests):
                 expression="(distinguishedName=%s)" % user_dn)
         self.assertEqual(res, [])
 
+    def test_delete_anonymous(self):
+        """Test add operation with anonymous user"""
+        anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+        self.ldb_admin.newuser("test_anonymous", "samba123@")
+
+        try:
+            anonymous.delete(self.get_user_dn("test_anonymous"))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+        else:
+            self.fail()
+
 #tests on ldap rename operations
 class AclRenameTests(AclTests):
 
diff --git a/source4/scripting/devel/speedtest.py b/source4/scripting/devel/speedtest.py
index 891a741..a7adfba 100755
--- a/source4/scripting/devel/speedtest.py
+++ b/source4/scripting/devel/speedtest.py
@@ -42,7 +42,7 @@ from samba.ndr import ndr_pack, ndr_unpack
 from samba.dcerpc import security
 
 from samba.auth import system_session
-from samba import gensec
+from samba import gensec, sd_utils
 from samba.samdb import SamDB
 from samba.credentials import Credentials
 import samba.tests
@@ -77,12 +77,6 @@ creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
 
 class SpeedTest(samba.tests.TestCase):
 
-    def find_basedn(self, ldb):
-        res = ldb.search(base="", expression="", scope=SCOPE_BASE,
-                         attrs=["defaultNamingContext"])
-        self.assertEquals(len(res), 1)
-        return res[0]["defaultNamingContext"][0]
-
     def find_domain_sid(self, ldb):
         res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
         return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
@@ -90,8 +84,8 @@ class SpeedTest(samba.tests.TestCase):
     def setUp(self):
         super(SpeedTest, self).setUp()
         self.ldb_admin = ldb
-        self.base_dn = self.find_basedn(self.ldb_admin)
-        self.domain_sid = self.find_domain_sid(self.ldb_admin)
+        self.base_dn = ldb.domain_dn()
+        self.domain_sid = security.dom_sid(ldb.get_domain_sid())
         self.user_pass = "samba123@"
         print "baseDN: %s" % self.base_dn
 
@@ -129,6 +123,11 @@ url: www.example.com
         for dn in dn_list:
             delete_force(self.ldb_admin, dn)
 
+class SpeedTestAddDel(SpeedTest):
+
+    def setUp(self):
+        super(SpeedTestAddDel, self).setUp()
+
     def run_bundle(self, num):
         print "\n=== Test ADD/DEL %s user objects ===\n" % num
         avg_add = Decimal("0.0")
@@ -169,6 +168,62 @@ url: www.example.com
         """
         self.run_bundle(10000)
 
+class AclSearchSpeedTest(SpeedTest):
+
+    def setUp(self):
+        super(AclSearchSpeedTest, self).setUp()
+        self.ldb_admin.newuser("acltestuser", "samba123@")
+        self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
+        self.ldb_user = self.get_ldb_connection("acltestuser", "samba123@")
+        self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn("acltestuser"))
+
+    def tearDown(self):
+        super(AclSearchSpeedTest, self).tearDown()
+        delete_force(self.ldb_admin, self.get_user_dn("acltestuser"))
+
+    def run_search_bundle(self, num, _ldb):
+        print "\n=== Creating %s user objects ===\n" % num
+        self.create_bundle(num)
+        mod = "(A;;LC;;;%s)(D;;RP;;;%s)" % (str(self.user_sid), str(self.user_sid))
+        for i in range(num):
+            self.sd_utils.dacl_add_ace("cn=speedtestuser%d,cn=Users,%s" %
+                                       (i+1, self.base_dn), mod)
+        print "\n=== %s user objects created ===\n" % num
+        print "\n=== Test search on %s user objects ===\n" % num
+        avg_search = Decimal("0.0")
+        for x in [1, 2, 3]:
+            start = time.time()
+            res = _ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
+            res_search = Decimal( str(time.time() - start) )
+            avg_search += res_search
+            print "   Attempt %s SEARCH: %.3fs" % ( x, float(res_search) )
+        print "Average Search: %.3fs" % float( Decimal(avg_search) / Decimal("3.0") )
+        self.remove_bundle(num)
+
+    def get_user_dn(self, name):
+        return "CN=%s,CN=Users,%s" % (name, self.base_dn)
+
+    def get_ldb_connection(self, target_username, target_password):
+        creds_tmp = Credentials()
+        creds_tmp.set_username(target_username)
+        creds_tmp.set_password(target_password)
+        creds_tmp.set_domain(creds.get_domain())
+        creds_tmp.set_realm(creds.get_realm())
+        creds_tmp.set_workstation(creds.get_workstation())
+        creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
+                                      | gensec.FEATURE_SEAL)
+        ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
+        return ldb_target
+
+    def test_search_01000(self):
+        self.run_search_bundle(1000, self.ldb_admin)
+
+    def test_search2_01000(self):
+        # allow the user to see objects but not attributes, all attributes will be filtered out
+        mod = "(A;;LC;;;%s)(D;;RP;;;%s)" % (str(self.user_sid), str(self.user_sid))
+        self.sd_utils.dacl_add_ace("CN=Users,%s" % self.base_dn, mod)
+        self.run_search_bundle(1000, self.ldb_user)
+
 # Important unit running information
 
 if not "://" in host:
@@ -179,7 +234,8 @@ ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp, optio
 
 runner = SubunitTestRunner()
 rc = 0
-if not runner.run(unittest.makeSuite(SpeedTest)).wasSuccessful():
+if not runner.run(unittest.makeSuite(SpeedTestAddDel)).wasSuccessful():
+    rc = 1
+if not runner.run(unittest.makeSuite(AclSearchSpeedTest)).wasSuccessful():
     rc = 1
-
 sys.exit(rc)


-- 
Samba Shared Repository


More information about the samba-cvs mailing list