svn commit: samba r26587 - in branches/SAMBA_4_0: . source/scripting/python/samba source/scripting/python/samba/tests

jelmer at samba.org jelmer at samba.org
Mon Dec 24 13:04:37 GMT 2007


Author: jelmer
Date: 2007-12-24 13:04:33 +0000 (Mon, 24 Dec 2007)
New Revision: 26587

WebSVN: http://websvn.samba.org/cgi-bin/viewcvs.cgi?view=rev&root=samba&rev=26587

Log:
Fix reading Samba 3 WINS database and initial work on group db, aliases and secrets.
Modified:
   branches/SAMBA_4_0/
   branches/SAMBA_4_0/source/scripting/python/samba/samba3.py
   branches/SAMBA_4_0/source/scripting/python/samba/tests/samba3.py
   branches/SAMBA_4_0/source/scripting/python/samba/upgrade.py


Changeset:

Property changes on: branches/SAMBA_4_0
___________________________________________________________________
Name: bzr:revision-info
...skipped...
Name: bzr:revision-id:v3-trunk0
...skipped...

Modified: branches/SAMBA_4_0/source/scripting/python/samba/samba3.py
===================================================================
--- branches/SAMBA_4_0/source/scripting/python/samba/samba3.py	2007-12-24 13:04:13 UTC (rev 26586)
+++ branches/SAMBA_4_0/source/scripting/python/samba/samba3.py	2007-12-24 13:04:33 UTC (rev 26587)
@@ -73,7 +73,10 @@
 
         # FIXME: Read privileges as well
 
+    def close(self):
+        self.tdb.close()
 
+
 GROUPDB_DATABASE_VERSION_V1 = 1 # native byte format.
 GROUPDB_DATABASE_VERSION_V2 = 2 # le format.
 
@@ -88,8 +91,22 @@
 class GroupMappingDatabase:
     def __init__(self, file): 
         self.tdb = tdb.Tdb(file, flags=os.O_RDONLY)
+        assert self.tdb.fetch_int32("INFO/version\x00") in (GROUPDB_DATABASE_VERSION_V1, GROUPDB_DATABASE_VERSION_V2)
 
+    def groupsids(self):
+        for k in self.tdb.keys():
+            if k.startswith(GROUP_PREFIX):
+                yield k[len(GROUP_PREFIX):].rstrip("\0")
 
+    def aliases(self):
+        for k in self.tdb.keys():
+            if k.startswith(MEMBEROF_PREFIX):
+                yield k[len(MEMBEROF_PREFIX):].rstrip("\0")
+
+    def close(self):
+        self.tdb.close()
+
+
 # High water mark keys
 HWM_GROUP = "GROUP HWM"
 HWM_USER = "USER HWM"
@@ -102,41 +119,57 @@
         self.tdb = tdb.Tdb(file, flags=os.O_RDONLY)
         assert self.tdb.fetch_int32("IDMAP_VERSION") == IDMAP_VERSION
 
+    def close(self):
+        self.tdb.close()
 
+
 class SecretsDatabase:
     def __init__(self, file):
         self.tdb = tdb.Tdb(file, flags=os.O_RDONLY)
-        self.domains = {}
-        for k, v in self.tdb.items():
-            if k == "SECRETS/AUTH_PASSWORD":
-                self.auth_password = v
-            elif k == "SECRETS/AUTH_DOMAIN":
-                self.auth_domain = v
-            elif k == "SECRETS/AUTH_USER":
-                self.auth_user = v
-            elif k.startswith("SECRETS/SID/"):
-                pass # FIXME
-            elif k.startswith("SECRETS/DOMGUID/"):
-                pass # FIXME
-            elif k.startswith("SECRETS/LDAP_BIND_PW/"):
-                pass # FIXME
-            elif k.startswith("SECRETS/AFS_KEYFILE/"):
-                pass # FIXME
-            elif k.startswith("SECRETS/MACHINE_SEC_CHANNEL_TYPE/"):
-                pass # FIXME
-            elif k.startswith("SECRETS/MACHINE_LAST_CHANGE_TIME/"):
-                pass # FIXME
-            elif k.startswith("SECRETS/MACHINE_PASSWORD/"):
-                pass # FIXME
-            elif k.startswith("SECRETS/$MACHINE.ACC/"):
-                pass # FIXME
-            elif k.startswith("SECRETS/$DOMTRUST.ACC/"):
-                pass # FIXME
-            elif k == "INFO/random_seed":
-                self.random_seed = v
-            else:
-                raise "Unknown key %s in secrets database" % k
 
+    def get_auth_password(self):
+        return self.tdb.get("SECRETS/AUTH_PASSWORD")
+
+    def get_auth_domain(self):
+        return self.tdb.get("SECRETS/AUTH_DOMAIN")
+
+    def get_auth_user(self):
+        return self.tdb.get("SECRETS/AUTH_USER")
+
+    def get_dom_guid(self, host):
+        return self.tdb.get("SECRETS/DOMGUID/%s" % host)
+
+    def get_ldap_bind_pw(self, host):
+        return self.tdb.get("SECRETS/LDAP_BIND_PW/%s" % host)
+    
+    def get_afs_keyfile(self, host):
+        return self.tdb.get("SECRETS/AFS_KEYFILE/%s" % host)
+
+    def get_machine_sec_channel_type(self, host):
+        return self.tdb.get("SECRETS/MACHINE_SEC_CHANNEL_TYPE/%s" % host)
+
+    def get_machine_last_change_time(self, host):
+        return self.tdb.get("SECRETS/MACHINE_LAST_CHANGE_TIME/%s" % host)
+            
+    def get_machine_password(self, host):
+        return self.tdb.get("SECRETS/MACHINE_PASSWORD/%s" % host)
+
+    def get_machine_acc(self, host):
+        return self.tdb.get("SECRETS/$MACHINE.ACC/%s" % host)
+
+    def get_domtrust_acc(self, host):
+        return self.tdb.get("SECRETS/$DOMTRUST.ACC/%s" % host)
+
+    def get_random_seed(self):
+        return self.tdb.get("INFO/random_seed")
+
+    def get_sid(self, host):
+        return self.tdb.get("SECRETS/SID/%s" % host.upper())
+
+    def close(self):
+        self.tdb.close()
+
+
 SHARE_DATABASE_VERSION_V1 = 1
 SHARE_DATABASE_VERSION_V2 = 2
 
@@ -149,6 +182,8 @@
         secdesc = self.tdb.get("SECDESC/%s" % name)
         # FIXME: Run ndr_pull_security_descriptor
 
+    def close(self):
+        self.tdb.close()
 
 ACB_DISABLED = 0x00000001
 ACB_HOMDIRREQ = 0x00000002
@@ -190,17 +225,80 @@
     def __init__(self, file):
         pass
 
+TDBSAM_FORMAT_STRING_V0 = "ddddddBBBBBBBBBBBBddBBwdwdBwwd"
+TDBSAM_FORMAT_STRING_V1 = "dddddddBBBBBBBBBBBBddBBwdwdBwwd"
+TDBSAM_FORMAT_STRING_V2 = "dddddddBBBBBBBBBBBBddBBBwwdBwwd"
+TDBSAM_USER_PREFIX = "USER_"
 
+
 class TdbSam:
     def __init__(self, file):
         self.tdb = tdb.Tdb(file, flags=os.O_RDONLY)
+        self.version = self.tdb.fetch_uint32("INFO/version") or 0
+        assert self.version in (0, 1, 2)
 
+    def usernames(self):
+        for k in self.tdb.keys():
+            if k.startswith(TDBSAM_USER_PREFIX):
+                yield k[len(TDBSAM_USER_PREFIX):].rstrip("\0")
 
+    def close(self):
+        self.tdb.close()
+
+
+def shellsplit(text):
+    """Very simple shell-like line splitting.
+    
+    :param text: Text to split.
+    :return: List with parts of the line as strings.
+    """
+    ret = list()
+    inquotes = False
+    current = ""
+    for c in text:
+        if c == "\"":
+            inquotes = not inquotes
+        elif c in ("\t", "\n", " ") and not inquotes:
+            ret.append(current)
+            current = ""
+        else:
+            current += c
+    if current != "":
+        ret.append(current)
+    return ret
+
+
 class WinsDatabase:
     def __init__(self, file):
+        self.entries = {}
+        f = open(file, 'r')
+        assert f.readline().rstrip("\n") == "VERSION 1 0"
+        for l in f.readlines():
+            if l[0] == "#": # skip comments
+                continue
+            entries = shellsplit(l.rstrip("\n"))
+            print entries
+            name = entries[0]
+            ttl = int(entries[1])
+            i = 2
+            ips = []
+            while "." in entries[i]:
+                ips.append(entries[i])
+                i+=1
+            nb_flags = entries[i]
+            assert not name in self.entries, "Name %s exists twice" % name
+            self.entries[name] = (ttl, ips, nb_flags)
+        f.close()
+
+    def __getitem__(self, name):
+        return self.entries[name]
+
+    def __len__(self):
+        return len(self.entries)
+
+    def close(self): # for consistency
         pass
 
-
 class Samba3:
     def __init__(self, smbconfpath, libdir):
         self.smbconfpath = smbconfpath

Modified: branches/SAMBA_4_0/source/scripting/python/samba/tests/samba3.py
===================================================================
--- branches/SAMBA_4_0/source/scripting/python/samba/tests/samba3.py	2007-12-24 13:04:13 UTC (rev 26586)
+++ branches/SAMBA_4_0/source/scripting/python/samba/tests/samba3.py	2007-12-24 13:04:33 UTC (rev 26587)
@@ -18,7 +18,8 @@
 #
 
 import unittest
-from samba.samba3 import GroupMappingDatabase, Registry, PolicyDatabase
+from samba.samba3 import (GroupMappingDatabase, Registry, PolicyDatabase, SecretsDatabase, TdbSam,
+                          WinsDatabase)
 import os
 
 DATADIR=os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")
@@ -59,3 +60,55 @@
     def setUp(self):
         self.groupdb = GroupMappingDatabase(os.path.join(DATADIR, "group_mapping.tdb"))
 
+    def tearDown(self):
+        self.groupdb.close()
+
+    def test_group_length(self):
+        self.assertEquals(13, len(list(self.groupdb.groupsids())))
+
+    def test_groupsids(self):
+        sids = list(self.groupdb.groupsids())
+        self.assertTrue("S-1-5-32-544" in sids)
+
+    def test_alias_length(self):
+        self.assertEquals(0, len(list(self.groupdb.aliases())))
+
+
+class SecretsDbTestCase(unittest.TestCase):
+    def setUp(self):
+        self.secretsdb = SecretsDatabase(os.path.join(DATADIR, "secrets.tdb"))
+
+    def tearDown(self):
+        self.secretsdb.close()
+
+    def test_get_sid(self):
+        self.assertTrue(self.secretsdb.get_sid("BEDWYR") is not None)
+
+
+class TdbSamTestCase(unittest.TestCase):
+    def setUp(self):
+        self.samdb = TdbSam(os.path.join(DATADIR, "passdb.tdb"))
+
+    def tearDown(self):
+        self.samdb.close()
+
+    def test_usernames(self):
+        self.assertEquals(3, len(list(self.samdb.usernames())))
+
+
+class WinsDatabaseTestCase(unittest.TestCase):
+    def setUp(self):
+        self.winsdb = WinsDatabase(os.path.join(DATADIR, "wins.dat"))
+
+    def test_length(self):
+        self.assertEquals(22, len(self.winsdb))
+
+    def test_first_entry(self):
+        self.assertEqual((1124185120, ["192.168.1.5"], "64R"), self.winsdb["ADMINISTRATOR#03"])
+
+    def tearDown(self):
+        self.winsdb.close()
+
+# FIXME: smbpasswd
+# FIXME: idmapdb
+# FIXME: Shares

Modified: branches/SAMBA_4_0/source/scripting/python/samba/upgrade.py
===================================================================
--- branches/SAMBA_4_0/source/scripting/python/samba/upgrade.py	2007-12-24 13:04:13 UTC (rev 26586)
+++ branches/SAMBA_4_0/source/scripting/python/samba/upgrade.py	2007-12-24 13:04:33 UTC (rev 26587)
@@ -530,15 +530,8 @@
     if ldapurl is not None:
         message("Enabling Samba3 LDAP mappings for SAM database")
 
-        samdb.modify("""
-dn: @MODULES
-changetype: modify
-replace: @LIST
- at LIST: samldb,operational,objectguid,rdn_name,samba3sam
-""")
+        enable_samba3sam(samdb)
 
-        samdb.add({"dn": "@MAP=samba3sam", "@MAP_URL": ldapurl})
-
     return ret
 
 def upgrade_verify(subobj, samba3, paths, message):
@@ -551,3 +544,15 @@
         assert(len(msg) >= 1)
     
     # FIXME
+
+
+
+def enable_samba3sam(samdb):
+    samdb.modify("""
+dn: @MODULES
+changetype: modify
+replace: @LIST
+ at LIST: samldb,operational,objectguid,rdn_name,samba3sam
+""")
+
+    samdb.add({"dn": "@MAP=samba3sam", "@MAP_URL": ldapurl})



More information about the samba-cvs mailing list