[PATCH][TDB] tdb transactions - please review!

tridge at samba.org tridge at samba.org
Wed Sep 21 11:15:47 GMT 2005


Attached below is a first pass at a patch adding transactions to
tdb. This will be hooked into ldb to give us two things:

 - make our databases safe against system crashes
 - ensure that all ldb operations either pass or fail, but never
   'partially pass'. ldb modules will be able to safely return an
   error to cause all database changes to be rolled back

This should also be useful in Samba3, although the deeper reliance of
Samba4 on more complex databases makes it particularly important for
Samba4.

I'd appreciate some feedback on the design, and of course any bugs
that you spot! I've tried to be very careful in coding this, but it is
quite a intricate thing to get right so it needs careful review. It
has taken me several days and a few false starts to get this working.

See the top of transaction.c for the basic design. See the rest of
transaction.c for all the transaction related code. It relies on a
VFS-like abstraction of key calls in the tdb internals to intercept
IOs during a transaction.

The performance seems to be reasonable, but not as good as I had
hoped. Using ldbtest on my laptop I'm getting about 400 transactions
per second in ldbtest (1/3 adds, 1/3 modifies and 1/3 deletes). Using
the sqlite3 backend I'm getting about 20 transactions per second. 

Using the bdb backend in openldap with the same ldbtest I get around
150 transactions per second, so we are doing a bit better than that,
but not as good as I had hoped. The 4 fsync/msync calls per
transaction (see the patch below) really cost a lot.

You can also have transactions without the fsync/msync and recovery
code, by passing TDB_NOSYNC to the tdb_open() call. That is of course
much faster - it gets about 5000 transactions per second.

Search speed is unaffected by transactions of course, and existing ldb
and tdb files should work fine (the file format remains compatible).

Cheers, Tridge



Index: common/transaction.c
===================================================================
--- common/transaction.c	(revision 0)
+++ common/transaction.c	(revision 0)
@@ -0,0 +1,976 @@
+ /* 
+   Unix SMB/CIFS implementation.
+
+   trivial database library
+
+   Copyright (C) Andrew Tridgell              2005
+
+     ** NOTE! The following LGPL license applies to the tdb
+     ** library. This does NOT imply that all of Samba is released
+     ** under the LGPL
+   
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "tdb_private.h"
+
+/*
+  transaction design:
+
+  - only allow a single transaction at a time per database. This makes
+    using the transaction API simpler, as otherwise the caller would
+    have to cope with temporary failures in transactions that conflict
+    with other current transactions
+
+  - keep the transaction recovery information in the same file as the
+    database, using a special 'transaction recovery' record pointed at
+    by the header. This removes the need for extra journal files as
+    used by some other databases
+
+  - dymacially allocated the transaction recover record, re-using it
+    for subsequent transactions. If a larger record is needed then
+    tdb_free() the old record to place it on the normal tdb freelist
+    before allocating the new record
+
+  - during transactions, keep a linked list of writes all that have
+    been performed by intercepting all tdb_write() calls. The hooked
+    transaction versions of tdb_read() and tdb_write() check this
+    linked list and try to use the elements of the list in preference
+    to the real database.
+
+  - don't allow any locks to be held when a transaction starts,
+    otherwise we can end up with deadlock (plus lack of lock nesting
+    in posix locks would mean the lock is lost)
+
+  - if the caller gains a lock during the transaction but doesn't
+    release it then fail the commit
+
+  - allow for nested calls to tdb_transaction_start(), re-using the
+    existing transaction record. If the inner transaction is cancelled
+    then a subsequent commit will fail
+ 
+  - keep a mirrored copy of the tdb hash chain heads to allow for the
+    fast hash heads scan on traverse, updating the mirrored copy in
+    the transaction version of tdb_write
+
+  - allow callers to mix transaction and non-transaction use of tdb,
+    although once a transaction is started then an exclusive lock is
+    gained until the transaction is committed or cancelled
+
+  - the commit stategy involves first saving away all modified data
+    into a linearised buffer in the transaction recovery area, then
+    marking the transaction recovery area with a magic value to
+    indicate a valid recovery record. In total 4 fsync/msync calls are
+    needed per commit to prevent race conditions. It might be possible
+    to reduce this to 3 or even 2 with some more work.
+
+  - check for a valid recovery record on open of the tdb, while the
+    global lock is held. Automatically recover from the transaction
+    recovery area if needed, then continue with the open as
+    usual. This allows for smooth crash recovery with no administrator
+    intervention.
+
+  - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
+    still available, but no transaction recovery area is used and no
+    fsync/msync calls are made.
+
+*/
+
+
+/*
+  hold the context of any current transaction
+*/
+struct tdb_transaction {
+	/* we keep a mirrored copy of the tdb hash heads here so
+	   tdb_next_hash_chain() can operate efficiently */
+	u32 *hash_heads;
+
+	/* the original io methods - used to do IOs to the real db */
+	const struct tdb_methods *io_methods;
+
+	/* the list of transaction elements. We use a doubly linked
+	   list with a last pointer to allow us to keep the list
+	   ordered, with first element at the front of the list. It
+	   needs to be doubly linked as the read/write traversals need
+	   to be backwards, while the commit needs to be forwards */
+	struct tdb_transaction_el {
+		struct tdb_transaction_el *next, *prev;
+		tdb_off_t offset;
+		tdb_len_t length;
+		unsigned char *data;
+	} *elements, *elements_last;
+
+	/* non-zero when an internal transaction error has
+	   occurred. All write operations will then fail until the
+	   transaction is ended */
+	int transaction_error;
+
+	/* when inside a transaction we need to keep track of any
+	   nested tdb_transaction_start() calls, as these are allowed,
+	   but don't create a new transaction */
+	int nesting;
+
+	/* old file size before transaction */
+	tdb_len_t old_map_size;
+};
+
+
+/*
+  read while in a transaction. We need to check first if the data is in our list
+  of transaction elements, then if not do a real read
+*/
+static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
+			    tdb_len_t len, int cv)
+{
+	struct tdb_transaction_el *el;
+
+	/* we need to walk the list backwards to get the most recent data */
+	for (el=tdb->transaction->elements_last;el;el=el->prev) {
+		tdb_len_t partial;
+
+		if (off+len <= el->offset) {
+			continue;
+		}
+		if (off >= el->offset + el->length) {
+			continue;
+		}
+
+		/* an overlapping read - needs to be split into up to
+		   2 reads and a memcpy */
+		if (off < el->offset) {
+			partial = el->offset - off;
+			if (transaction_read(tdb, off, buf, partial, cv) != 0) {
+				goto fail;
+			}
+			len -= partial;
+			off += partial;
+			buf = (void *)(partial + (char *)buf);
+		}
+		if (off + len <= el->offset + el->length) {
+			partial = len;
+		} else {
+			partial = el->offset + el->length - off;
+		}
+		memcpy(buf, el->data + (off - el->offset), partial);
+		if (cv) {
+			tdb_convert(buf, len);
+		}
+		len -= partial;
+		off += partial;
+		buf = (void *)(partial + (char *)buf);
+		
+		if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
+			goto fail;
+		}
+
+		return 0;
+	}
+
+	/* its not in the transaction elements - do a real read */
+	return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
+
+fail:
+	TDB_LOG((tdb, 0, "transaction_read: failed at off=%d len=%d\n", off, len));
+	tdb->ecode = TDB_ERR_IO;
+	tdb->transaction->transaction_error = 1;
+	return -1;
+}
+
+
+/*
+  write while in a transaction
+*/
+static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
+			     const void *buf, tdb_len_t len)
+{
+	struct tdb_transaction_el *el;
+	
+	/* if the write is to a hash head, then update the transaction
+	   hash heads */
+	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
+	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
+		u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
+		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
+	}
+
+	/* first see if we can replace an existing entry */
+	for (el=tdb->transaction->elements_last;el;el=el->prev) {
+		tdb_len_t partial;
+
+		if (off+len <= el->offset) {
+			continue;
+		}
+		if (off >= el->offset + el->length) {
+			continue;
+		}
+
+		/* an overlapping write - needs to be split into up to
+		   2 writes and a memcpy */
+		if (off < el->offset) {
+			partial = el->offset - off;
+			if (transaction_write(tdb, off, buf, partial) != 0) {
+				goto fail;
+			}
+			len -= partial;
+			off += partial;
+			buf = (const void *)(partial + (const char *)buf);
+		}
+		if (off + len <= el->offset + el->length) {
+			partial = len;
+		} else {
+			partial = el->offset + el->length - off;
+		}
+		memcpy(el->data + (off - el->offset), buf, partial);
+		len -= partial;
+		off += partial;
+		buf = (const void *)(partial + (const char *)buf);
+		
+		if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
+			goto fail;
+		}
+
+		return 0;
+	}
+
+	/* add a new entry at the end of the list */
+	el = malloc(sizeof(*el));
+	if (el == NULL) {
+		tdb->ecode = TDB_ERR_OOM;
+		tdb->transaction->transaction_error = 1;		
+		return -1;
+	}
+	el->next = NULL;
+	el->prev = tdb->transaction->elements_last;
+	el->offset = off;
+	el->length = len;
+	el->data = malloc(len);
+	if (el->data == NULL) {
+		free(el);
+		tdb->ecode = TDB_ERR_OOM;
+		tdb->transaction->transaction_error = 1;		
+		return -1;
+	}
+	if (buf) {
+		memcpy(el->data, buf, len);
+	} else {
+		memset(el->data, TDB_PAD_BYTE, len);
+	}
+	if (el->prev) {
+		el->prev->next = el;
+	} else {
+		tdb->transaction->elements = el;
+	}
+	tdb->transaction->elements_last = el;
+	return 0;
+
+fail:
+	TDB_LOG((tdb, 0, "transaction_write: failed at off=%d len=%d\n", off, len));
+	tdb->ecode = TDB_ERR_IO;
+	tdb->transaction->transaction_error = 1;
+	return -1;
+}
+
+/*
+  accelerated hash chain head search, using the cached hash heads
+*/
+static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
+{
+	u32 h = *chain;
+	for (;h < tdb->header.hash_size;h++) {
+		/* the +1 takes account of the freelist */
+		if (0 != tdb->transaction->hash_heads[h+1]) {
+			break;
+		}
+	}
+	(*chain) = h;
+}
+
+/*
+  out of bounds check during a transaction
+*/
+static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
+{
+	if (len <= tdb->map_size) {
+		return 0;
+	}
+	return TDB_ERRCODE(TDB_ERR_IO, -1);
+}
+
+/*
+  transaction version of tdb_expand().
+*/
+static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
+				   tdb_off_t addition)
+{
+	/* add a write to the transaction elements, so subsequent
+	   reads see the zero data */
+	if (transaction_write(tdb, size, NULL, addition) != 0) {
+		return -1;
+	}
+
+	return 0;
+}
+
+/*
+  brlock during a transaction - ignore them
+*/
+int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
+		       int rw_type, int lck_type, int probe)
+{
+	return 0;
+}
+
+static const struct tdb_methods transaction_methods = {
+	.tdb_read        = transaction_read,
+	.tdb_write       = transaction_write,
+	.next_hash_chain = transaction_next_hash_chain,
+	.tdb_oob         = transaction_oob,
+	.tdb_expand_file = transaction_expand_file,
+	.tdb_brlock      = transaction_brlock
+};
+
+
+/*
+  start a tdb transaction. No token is returned, as only a single
+  transaction is allowed to be pending per tdb_context
+*/
+int tdb_transaction_start(struct tdb_context *tdb)
+{
+	/* some sanity checks */
+	if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
+		TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
+		tdb->ecode = TDB_ERR_EINVAL;
+		return -1;
+	}
+
+	/* cope with nested tdb_transaction_start() calls */
+	if (tdb->transaction != NULL) {
+		tdb->transaction->nesting++;
+		TDB_LOG((tdb, 0, "tdb_transaction_start: nesting %d\n", 
+			 tdb->transaction->nesting));
+		return 0;
+	}
+
+	if (tdb->num_locks != 0) {
+		/* the caller must not have any locks when starting a
+		   transaction as otherwise we'll be screwed by lack
+		   of nested locks in posix */
+		TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction with locks held\n"));
+		tdb->ecode = TDB_ERR_LOCK;
+		return -1;
+	}
+
+	tdb->transaction = calloc(sizeof(struct tdb_transaction), 1);
+	if (tdb->transaction == NULL) {
+		tdb->ecode = TDB_ERR_OOM;
+		return -1;
+	}
+
+	/* get the transaction write lock. This is a blocking lock. As
+	   discussed with Volker, there are a number of ways we could
+	   make this async, which we will probably do in the future */
+	if (tdb_brlock_len(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get transaction lock\n"));
+		tdb->ecode = TDB_ERR_LOCK;
+		SAFE_FREE(tdb->transaction);
+		return -1;
+	}
+	
+	/* get a write lock from the freelist to the end of file. It
+	   would be much better to make this a read lock as it would
+	   increase parallelism, but it could lead to deadlocks on
+	   commit when a write lock needs to be taken. 
+
+	   TODO: look at alternative locking strategies to allow this
+	   to be a read lock 
+	*/
+	if (tdb_brlock_len(tdb, FREELIST_TOP, F_WRLCK, F_SETLKW, 0, 0) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get hash locks\n"));
+		tdb->ecode = TDB_ERR_LOCK;
+		goto fail;
+	}
+
+	/* setup a copy of the hash table heads so the hash scan in
+	   traverse can be fast */
+	tdb->transaction->hash_heads = calloc(tdb->header.hash_size+1, sizeof(tdb_off_t));
+	if (tdb->transaction->hash_heads == NULL) {
+		tdb->ecode = TDB_ERR_OOM;
+		goto fail;
+	}
+	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
+				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
+		TDB_LOG((tdb, 0, "tdb_transaction_start: failed to read hash heads\n"));
+		tdb->ecode = TDB_ERR_IO;
+		goto fail;
+	}
+
+	/* make sure we know about any file expansions already done by
+	   anyone else */
+	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+	tdb->transaction->old_map_size = tdb->map_size;
+
+	/* finally hook the io methods, replacing them with
+	   transaction specific methods */
+	tdb->transaction->io_methods = tdb->methods;
+	tdb->methods = &transaction_methods;
+
+	return 0;
+	
+fail:
+	tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
+	tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+	SAFE_FREE(tdb->transaction->hash_heads);
+	SAFE_FREE(tdb->transaction);
+	return -1;
+}
+
+
+/*
+  cancel the current transaction
+*/
+int tdb_transaction_cancel(struct tdb_context *tdb)
+{	
+	if (tdb->transaction == NULL) {
+		TDB_LOG((tdb, 0, "tdb_transaction_cancel: no transaction\n"));
+		return -1;
+	}
+
+	if (tdb->transaction->nesting != 0) {
+		tdb->transaction->transaction_error = 1;
+		tdb->transaction->nesting--;
+		return 0;
+	}		
+
+	tdb->map_size = tdb->transaction->old_map_size;
+
+	/* free all the transaction elements */
+	while (tdb->transaction->elements) {
+		struct tdb_transaction_el *el = tdb->transaction->elements;
+		tdb->transaction->elements = el->next;
+		free(el->data);
+		free(el);
+	}
+
+	/* remove any locks created during the transaction */
+	if (tdb->num_locks != 0) {
+		int h;
+		for (h=0;h<tdb->header.hash_size+1;h++) {
+			if (tdb->locked[h].count != 0) {
+				tdb_brlock_len(tdb,FREELIST_TOP+4*h,F_UNLCK,F_SETLKW, 0, 1);
+				tdb->locked[h].count = 0;
+			}
+		}
+		tdb->num_locks = 0;
+	}
+
+	/* restore the normal io methods */
+	tdb->methods = tdb->transaction->io_methods;
+
+	tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
+	tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+	SAFE_FREE(tdb->transaction->hash_heads);
+	SAFE_FREE(tdb->transaction);
+	
+	return 0;
+}
+
+/*
+  sync to disk
+*/
+static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
+{	
+	if (fsync(tdb->fd) != 0) {
+		tdb->ecode = TDB_ERR_IO;
+		TDB_LOG((tdb, 0, "tdb_transaction: fsync failed\n"));
+		return -1;
+	}
+#ifdef MS_SYNC
+	if (tdb->map_ptr) {
+		tdb_off_t moffset = offset & ~(tdb->page_size-1);
+		if (msync(moffset + (char *)tdb->map_ptr, 
+			  length + (offset - moffset), MS_SYNC) != 0) {
+			tdb->ecode = TDB_ERR_IO;
+			TDB_LOG((tdb, 0, "tdb_transaction: msync failed\n"));
+			return -1;
+		}
+	}
+#endif
+	return 0;
+}
+
+
+/*
+  work out how much space the linearised recovery data will consume
+*/
+static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
+{
+	struct tdb_transaction_el *el;
+	tdb_len_t recovery_size = 0;
+
+	recovery_size = sizeof(u32);
+	for (el=tdb->transaction->elements;el;el=el->next) {
+		if (el->offset >= tdb->transaction->old_map_size) {
+			continue;
+		}
+		recovery_size += 2*sizeof(tdb_off_t) + el->length;
+	}
+
+	return recovery_size;
+}
+
+/*
+  allocate the recovery area, or use an existing recovery area if it is
+  large enough
+*/
+static int tdb_recovery_allocate(struct tdb_context *tdb, 
+				 tdb_len_t *recovery_size,
+				 tdb_off_t *recovery_offset,
+				 tdb_len_t *recovery_max_size)
+{
+	struct list_struct rec;
+	const struct tdb_methods *methods = tdb->transaction->io_methods;
+	tdb_off_t recovery_head;
+
+	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
+		TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery head\n"));
+		return -1;
+	}
+
+	rec.rec_len = 0;
+
+	if (recovery_head != 0 && 
+	    methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
+		TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery record\n"));
+		return -1;
+	}
+
+	*recovery_size = tdb_recovery_size(tdb);
+
+	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
+		/* it fits in the existing area */
+		*recovery_max_size = rec.rec_len;
+		*recovery_offset = recovery_head;
+		return 0;
+	}
+
+	/* we need to free up the old recovery area, then allocate a
+	   new one at the end of the file. Note that we cannot use
+	   tdb_allocate() to allocate the new one as that might return
+	   us an area that is being currently used (as of the start of
+	   the transaction) */
+	if (recovery_head != 0) {
+		if (tdb_free(tdb, recovery_head, &rec) == -1) {
+			TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to free previous recovery area\n"));
+			return -1;
+		}
+	}
+
+	/* the tdb_free() call might have increased the recovery size */
+	*recovery_size = tdb_recovery_size(tdb);
+
+	/* round up to a multiple of page size */
+	*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
+	*recovery_offset = tdb->map_size;
+	recovery_head = *recovery_offset;
+
+	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
+				     (tdb->map_size - tdb->transaction->old_map_size) +
+				     sizeof(rec) + *recovery_max_size) == -1) {
+		TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to create recovery area\n"));
+		return -1;
+	}
+
+	/* remap the file (if using mmap) */
+	methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+
+	/* we have to reset the old map size so that we don't try to expand the file
+	   again in the transaction commit, which would destroy the recovery area */
+	tdb->transaction->old_map_size = tdb->map_size;
+
+	/* write the recovery header offset and sync - we can sync without a race here
+	   as the magic ptr in the recovery record has not been set */
+	CONVERT(recovery_head);
+	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
+			       &recovery_head, sizeof(tdb_off_t)) == -1) {
+		TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to write recovery head\n"));
+		return -1;
+	}
+
+	return 0;
+}
+
+
+/*
+  setup the recovery data that will be used on a crash during commit
+*/
+static int transaction_setup_recovery(struct tdb_context *tdb, 
+				      tdb_off_t *magic_offset)
+{
+	struct tdb_transaction_el *el;
+	tdb_len_t recovery_size;
+	unsigned char *data, *p;
+	const struct tdb_methods *methods = tdb->transaction->io_methods;
+	struct list_struct *rec;
+	tdb_off_t recovery_offset, recovery_max_size;
+	tdb_off_t old_map_size = tdb->transaction->old_map_size;
+	u32 magic;
+
+	/*
+	  check that the recovery area has enough space
+	*/
+	if (tdb_recovery_allocate(tdb, &recovery_size, 
+				  &recovery_offset, &recovery_max_size) == -1) {
+		return -1;
+	}
+
+	data = malloc(recovery_size + sizeof(*rec));
+	if (data == NULL) {
+		tdb->ecode = TDB_ERR_OOM;
+		return -1;
+	}
+
+	rec = (struct list_struct *)data;
+	memset(rec, 0, sizeof(*rec));
+
+	rec->magic    = 0;
+	rec->data_len = recovery_size;
+	rec->rec_len  = recovery_max_size;
+	rec->key_len  = old_map_size;
+	CONVERT(rec);
+
+	/* build the recovery data into a single blob to allow us to do a single
+	   large write, which should be more efficient */
+	p = data + sizeof(*rec);
+	for (el=tdb->transaction->elements;el;el=el->next) {
+		if (el->offset >= old_map_size) {
+			continue;
+		}
+		if (el->offset + el->length > tdb->transaction->old_map_size) {
+			TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction data over new region boundary\n"));
+			free(data);
+			tdb->ecode = TDB_ERR_CORRUPT;
+			return -1;
+		}
+		((u32 *)p)[0] = el->offset;
+		((u32 *)p)[1] = el->length;
+		if (DOCONV()) {
+			tdb_convert(p, 8);
+		}
+		/* the recovery area contains the old data, not the
+		   new data, so we have to call the original tdb_read
+		   method to get it */
+		if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
+			free(data);
+			tdb->ecode = TDB_ERR_IO;
+			return -1;
+		}
+		p += 8 + el->length;
+	}
+
+	/* and the tailer */
+	*(u32 *)p = sizeof(*rec) + recovery_max_size;
+	CONVERT(p);
+
+	/* write the recovery data to the recovery area */
+	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery data\n"));
+		free(data);
+		tdb->ecode = TDB_ERR_IO;
+		return -1;
+	}
+
+	/* as we don't have ordered writes, we have to sync the recovery
+	   data before we update the magic to indicate that the recovery
+	   data is present */
+	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
+		free(data);
+		return -1;
+	}
+
+	free(data);
+
+	magic = TDB_RECOVERY_MAGIC;
+	CONVERT(magic);
+
+	*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
+
+	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery magic\n"));
+		tdb->ecode = TDB_ERR_IO;
+		return -1;
+	}
+
+	/* ensure the recovery magic marker is on disk */
+	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
+		return -1;
+	}
+
+	return 0;
+}
+
+/*
+  commit the current transaction
+*/
+int tdb_transaction_commit(struct tdb_context *tdb)
+{	
+	const struct tdb_methods *methods;
+	tdb_off_t magic_offset;
+	u32 zero = 0;
+
+	if (tdb->transaction == NULL) {
+		TDB_LOG((tdb, 0, "tdb_transaction_commit: no transaction\n"));
+		return -1;
+	}
+
+	if (tdb->transaction->transaction_error) {
+		tdb->ecode = TDB_ERR_IO;
+		tdb_transaction_cancel(tdb);
+		TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction error pending\n"));
+		return -1;
+	}
+
+	if (tdb->transaction->nesting != 0) {
+		tdb->transaction->nesting--;
+		return 0;
+	}		
+
+	/* check for a null transaction */
+	if (tdb->transaction->elements == NULL) {
+		tdb_transaction_cancel(tdb);
+		return 0;
+	}
+
+	methods = tdb->transaction->io_methods;
+	
+	/* if there are any locks pending then the caller has not
+	   nested their locks properly, so fail the transaction */
+	if (tdb->num_locks) {
+		tdb->ecode = TDB_ERR_LOCK;
+		TDB_LOG((tdb, 0, "tdb_transaction_commit: locks pending on commit\n"));
+		tdb_transaction_cancel(tdb);
+		return -1;
+	}
+
+	/* get the global lock - this prevents new users attaching to the database
+	   during the commit */
+	if (tdb_brlock_len(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to get global lock\n"));
+		tdb->ecode = TDB_ERR_LOCK;
+		tdb_transaction_cancel(tdb);
+		return -1;
+	}
+
+	if (!(tdb->flags & TDB_NOSYNC)) {
+		/* write the recovery data to the end of the file */
+		if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
+			TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to setup recovery data\n"));
+			tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+			tdb_transaction_cancel(tdb);
+			return -1;
+		}
+	}
+
+	/* expand the file to the new size if needed */
+	if (tdb->map_size != tdb->transaction->old_map_size) {
+		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
+					     tdb->map_size - 
+					     tdb->transaction->old_map_size) == -1) {
+			tdb->ecode = TDB_ERR_IO;
+			TDB_LOG((tdb, 0, "tdb_transaction_commit: expansion failed\n"));
+			tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+			tdb_transaction_cancel(tdb);
+			return -1;
+		}
+		tdb->map_size = tdb->transaction->old_map_size;
+		methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+	}
+
+	/* perform all the writes */
+	while (tdb->transaction->elements) {
+		struct tdb_transaction_el *el = tdb->transaction->elements;
+
+		if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
+			TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed during commit\n"));
+			
+			/* we've overwritten part of the data and
+			   possibly expanded the file, so we need to
+			   run the crash recovery code */
+			tdb->methods = methods;
+			tdb_transaction_recover(tdb); 
+
+			tdb_transaction_cancel(tdb);
+			tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+
+			TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed\n"));
+			return -1;
+		}
+		tdb->transaction->elements = el->next;
+		free(el->data); 
+		free(el);
+	} 
+
+	if (!(tdb->flags & TDB_NOSYNC)) {
+		/* ensure the new data is on disk */
+		if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
+			return -1;
+		}
+
+		/* remove the recovery marker */
+		if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
+			TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to remove recovery magic\n"));
+			return -1;
+		}
+
+		/* ensure the recovery marker has been removed on disk */
+		if (transaction_sync(tdb, magic_offset, 4) == -1) {
+			return -1;
+		}
+	}
+
+	tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+
+	/* use a transaction cancel to free memory and remove the
+	   transaction locks */
+	tdb_transaction_cancel(tdb);
+	return 0;
+}
+
+
+/*
+  recover from an aborted transaction. Must be called with exclusive
+  database write access already established (including the global
+  lock to prevent new processes attaching)
+*/
+int tdb_transaction_recover(struct tdb_context *tdb)
+{
+	tdb_off_t recovery_head, recovery_eof;
+	unsigned char *data, *p;
+	u32 zero = 0;
+	struct list_struct rec;
+
+	/* find the recovery area */
+	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery head\n"));
+		tdb->ecode = TDB_ERR_IO;
+		return -1;
+	}
+
+	if (recovery_head == 0) {
+		/* we have never allocated a recovery record */
+		return 0;
+	}
+
+	/* read the recovery record */
+	if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
+				   sizeof(rec), DOCONV()) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery record\n"));		
+		tdb->ecode = TDB_ERR_IO;
+		return -1;
+	}
+
+	if (rec.magic != TDB_RECOVERY_MAGIC) {
+		/* there is no valid recovery data */
+		return 0;
+	}
+
+	if (tdb->read_only) {
+		TDB_LOG((tdb, 0, "tdb_transaction_recover: attempt to recover read only database\n"));
+		tdb->ecode = TDB_ERR_CORRUPT;
+		return -1;
+	}
+
+	recovery_eof = rec.key_len;
+
+	data = malloc(rec.data_len);
+	if (data == NULL) {
+		TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to allocate recovery data\n"));		
+		tdb->ecode = TDB_ERR_OOM;
+		return -1;
+	}
+
+	/* read the full recovery data */
+	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
+				   rec.data_len, 0) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery data\n"));		
+		tdb->ecode = TDB_ERR_IO;
+		return -1;
+	}
+
+	/* recover the file data */
+	p = data;
+	while (p+8 < data + rec.data_len) {
+		u32 ofs, len;
+		if (DOCONV()) {
+			tdb_convert(p, 8);
+		}
+		ofs = ((u32 *)p)[0];
+		len = ((u32 *)p)[1];
+
+		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
+			free(data);
+			TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
+			tdb->ecode = TDB_ERR_IO;
+			return -1;
+		}
+		p += 8 + len;
+	}
+
+	free(data);
+
+	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync recovery\n"));
+		tdb->ecode = TDB_ERR_IO;
+		return -1;
+	}
+
+	/* if the recovery area is after the recovered eof then remove it */
+	if (recovery_eof <= recovery_head) {
+		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
+			TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery head\n"));
+			tdb->ecode = TDB_ERR_IO;
+			return -1;			
+		}
+	}
+
+	/* remove the recovery magic */
+	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
+			  &zero) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery magic\n"));
+		tdb->ecode = TDB_ERR_IO;
+		return -1;			
+	}
+	
+	/* reduce the file size to the old size */
+	tdb_munmap(tdb);
+	if (ftruncate(tdb->fd, recovery_eof) != 0) {
+		TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to reduce to recovery size\n"));
+		tdb->ecode = TDB_ERR_IO;
+		return -1;			
+	}
+	tdb->map_size = recovery_eof;
+	tdb_mmap(tdb);
+
+	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
+		TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync2 recovery\n"));
+		tdb->ecode = TDB_ERR_IO;
+		return -1;
+	}
+
+	TDB_LOG((tdb, 0, "tdb_transaction_recover: recovered %d byte database\n", 
+		 recovery_eof));
+
+	/* all done */
+	return 0;
+}
Index: tools/tdbtorture.c
===================================================================
--- tools/tdbtorture.c	(revision 10253)
+++ tools/tdbtorture.c	(working copy)
@@ -1,7 +1,9 @@
 /* this tests tdb by doing lots of ops from several simultaneous
-   writers - that stresses the locking code. Build with TDB_DEBUG=1
-   for best effect */
+   writers - that stresses the locking code. 
+*/
 
+#define _GNU_SOURCE
+
 #ifndef _SAMBA_BUILD_
 #include <stdlib.h>
 #include <time.h>
@@ -28,11 +30,14 @@
 
 #endif
 
+#include <getopt.h>
+
 #define REOPEN_PROB 30
 #define DELETE_PROB 8
 #define STORE_PROB 4
 #define APPEND_PROB 6
-#define LOCKSTORE_PROB 0
+#define TRANSACTION_PROB 10
+#define LOCKSTORE_PROB 5
 #define TRAVERSE_PROB 20
 #define CULL_PROB 100
 #define KEYLEN 3
@@ -40,6 +45,7 @@
 #define LOCKLEN 20
 
 static struct tdb_context *db;
+static int in_transaction;
 
 #ifdef PRINTF_ATTRIBUTE
 static void tdb_log(struct tdb_context *tdb, int level, const char *format, ...) PRINTF_ATTRIBUTE(3,4);
@@ -84,25 +90,25 @@
 static int cull_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf,
 			 void *state)
 {
+#if CULL_PROB
 	if (random() % CULL_PROB == 0) {
 		tdb_delete(tdb, key);
 	}
+#endif
 	return 0;
 }
 
 static void addrec_db(void)
 {
-	int klen, dlen, slen;
-	char *k, *d, *s;
-	TDB_DATA key, data, lockkey;
+	int klen, dlen;
+	char *k, *d;
+	TDB_DATA key, data;
 
 	klen = 1 + (rand() % KEYLEN);
 	dlen = 1 + (rand() % DATALEN);
-	slen = 1 + (rand() % LOCKLEN);
 
 	k = randbuf(klen);
 	d = randbuf(dlen);
-	s = randbuf(slen);
 
 	key.dptr = (unsigned char *)k;
 	key.dsize = klen+1;
@@ -110,11 +116,32 @@
 	data.dptr = (unsigned char *)d;
 	data.dsize = dlen+1;
 
-	lockkey.dptr = (unsigned char *)s;
-	lockkey.dsize = slen+1;
+#if TRANSACTION_PROB
+	if (in_transaction == 0 && random() % TRANSACTION_PROB == 0) {
+		if (tdb_transaction_start(db) != 0) {
+			fatal("tdb_transaction_start failed");
+		}
+		in_transaction++;
+		goto next;
+	}
+	if (in_transaction && random() % TRANSACTION_PROB == 0) {
+		if (tdb_transaction_commit(db) != 0) {
+			fatal("tdb_transaction_commit failed");
+		}
+		in_transaction--;
+		goto next;
+	}
+	if (in_transaction && random() % TRANSACTION_PROB == 0) {
+		if (tdb_transaction_cancel(db) != 0) {
+			fatal("tdb_transaction_cancel failed");
+		}
+		in_transaction--;
+		goto next;
+	}
+#endif
 
 #if REOPEN_PROB
-	if (random() % REOPEN_PROB == 0) {
+	if (in_transaction == 0 && random() % REOPEN_PROB == 0) {
 		tdb_reopen_all();
 		goto next;
 	} 
@@ -147,13 +174,13 @@
 
 #if LOCKSTORE_PROB
 	if (random() % LOCKSTORE_PROB == 0) {
-		tdb_chainlock(db, lockkey);
+		tdb_chainlock(db, key);
 		data = tdb_fetch(db, key);
 		if (tdb_store(db, key, data, TDB_REPLACE) != 0) {
 			fatal("tdb_store failed");
 		}
 		if (data.dptr) free(data.dptr);
-		tdb_chainunlock(db, lockkey);
+		tdb_chainunlock(db, key);
 		goto next;
 	} 
 #endif
@@ -171,7 +198,6 @@
 next:
 	free(k);
 	free(d);
-	free(s);
 }
 
 static int traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf,
@@ -181,39 +207,72 @@
 	return 0;
 }
 
-#ifndef NPROC
-#define NPROC 2
-#endif
+static void usage(void)
+{
+	printf("Usage: tdbtorture [-n NUM_PROCS] [-l NUM_LOOPS] [-s SEED] [-H HASH_SIZE]\n");
+	exit(0);
+}
 
-#ifndef NLOOPS
-#define NLOOPS 5000
-#endif
-
- int main(int argc, const char *argv[])
+ int main(int argc, char * const *argv)
 {
-	int i, seed=0;
-	int loops = NLOOPS;
-	pid_t pids[NPROC];
+	int i, seed = -1;
+	int num_procs = 2;
+	int num_loops = 5000;
+	int hash_size = 2;
+	int c;
+	extern char *optarg;
+	pid_t *pids;
 
-	pids[0] = getpid();
+	while ((c = getopt(argc, argv, "n:l:s:H:h")) != -1) {
+		switch (c) {
+		case 'n':
+			num_procs = strtol(optarg, NULL, 0);
+			break;
+		case 'l':
+			num_loops = strtol(optarg, NULL, 0);
+			break;
+		case 'H':
+			hash_size = strtol(optarg, NULL, 0);
+			break;
+		case 's':
+			seed = strtol(optarg, NULL, 0);
+			break;
+		default:
+			usage();
+		}
+	}
 
 	unlink("torture.tdb");
 
-	for (i=0;i<NPROC-1;i++) {
+	pids = calloc(sizeof(pid_t), num_procs);
+	pids[0] = getpid();
+
+	for (i=0;i<num_procs-1;i++) {
 		if ((pids[i+1]=fork()) == 0) break;
 	}
 
-	db = tdb_open("torture.tdb", 2, TDB_CLEAR_IF_FIRST, 
-		      O_RDWR | O_CREAT, 0600);
+	db = tdb_open_ex("torture.tdb", hash_size, TDB_CLEAR_IF_FIRST, 
+			 O_RDWR | O_CREAT, 0600, tdb_log, NULL);
 	if (!db) {
 		fatal("db open failed");
 	}
-	tdb_logging_function(db, tdb_log);
 
-	srand(seed + getpid());
-	srandom(seed + getpid() + time(NULL));
-	for (i=0;i<loops;i++) addrec_db();
+	if (seed == -1) {
+		seed = (getpid() + time(NULL)) & 0x7FFFFFFF;
+	}
 
+	if (i == 0) {
+		printf("testing with %d processes, %d loops, %d hash_size, seed=%d\n", 
+		       num_procs, num_loops, hash_size, seed);
+	}
+
+	srand(seed + i);
+	srandom(seed + i);
+
+	for (i=0;i<num_loops;i++) {
+		addrec_db();
+	}
+
 	tdb_traverse(db, NULL, NULL);
 	tdb_traverse(db, traverse_fn, NULL);
 	tdb_traverse(db, traverse_fn, NULL);
@@ -221,7 +280,7 @@
 	tdb_close(db);
 
 	if (getpid() == pids[0]) {
-		for (i=0;i<NPROC-1;i++) {
+		for (i=0;i<num_procs-1;i++) {
 			int status;
 			if (waitpid(pids[i+1], &status, 0) != pids[i+1]) {
 				printf("failed to wait for %d\n",
Index: tools/tdbtool.c
===================================================================
--- tools/tdbtool.c	(revision 10253)
+++ tools/tdbtool.c	(working copy)
@@ -34,6 +34,7 @@
 #include <sys/time.h>
 #include <ctype.h>
 #include <signal.h>
+#include <stdarg.h>
 #include "tdb.h"
 
 /* a tdb tool for manipulating a tdb database */
@@ -77,6 +78,19 @@
 		printf("%c",isprint(buf[i])?buf[i]:'.');
 }
 
+#ifdef PRINTF_ATTRIBUTE
+static void tdb_log(struct tdb_context *t, int level, const char *format, ...) PRINTF_ATTRIBUTE(3,4);
+#endif
+static void tdb_log(struct tdb_context *t, int level, const char *format, ...)
+{
+	va_list ap;
+    
+	va_start(ap, format);
+	vfprintf(stdout, format, ap);
+	va_end(ap);
+	fflush(stdout);
+}
+
 static void print_data(unsigned char *buf,int len)
 {
 	int i=0;
@@ -131,7 +145,7 @@
 "\n");
 }
 
-static void terror(char *why)
+static void terror(const char *why)
 {
 	printf("%s\n", why);
 }
@@ -175,8 +189,8 @@
 		return;
 	}
 	if (tdb) tdb_close(tdb);
-	tdb = tdb_open(tok, 0, TDB_CLEAR_IF_FIRST,
-		       O_RDWR | O_CREAT | O_TRUNC, 0600);
+	tdb = tdb_open_ex(tok, 0, TDB_CLEAR_IF_FIRST,
+			  O_RDWR | O_CREAT | O_TRUNC, 0600, tdb_log, NULL);
 	if (!tdb) {
 		printf("Could not create %s: %s\n", tok, strerror(errno));
 	}
@@ -190,7 +204,7 @@
 		return;
 	}
 	if (tdb) tdb_close(tdb);
-	tdb = tdb_open(tok, 0, 0, O_RDWR, 0600);
+	tdb = tdb_open_ex(tok, 0, 0, O_RDWR, 0600, tdb_log, NULL);
 	if (!tdb) {
 		printf("Could not open %s: %s\n", tok, strerror(errno));
 	}
@@ -326,7 +340,7 @@
 	
 	print_rec(tdb, key, dbuf, NULL);
 	
-	dst_tdb = tdb_open(file, 0, 0, O_RDWR, 0600);
+	dst_tdb = tdb_open_ex(file, 0, 0, O_RDWR, 0600, tdb_log, NULL);
 	if ( !dst_tdb ) {
 		terror("unable to open destination tdb");
 		return;
@@ -377,7 +391,7 @@
 		printf("%d records totalling %d bytes\n", count, total_bytes);
 }
 
-static char *tdb_getline(char *prompt)
+static char *tdb_getline(const char *prompt)
 {
 	static char line[1024];
 	char *p;
Index: Makefile.in
===================================================================
--- Makefile.in	(revision 10253)
+++ Makefile.in	(working copy)
@@ -2,7 +2,7 @@
 # Makefile for tdb directory
 #
 
-CFLAGS = -DTDB_DEBUG -g -Iinclude
+CFLAGS = -Iinclude @CFLAGS@
 CC = @CC@
 prefix = @prefix@
 exec_prefix = @exec_prefix@
@@ -12,7 +12,8 @@
 
 PROGS = bin/tdbtool bin/tdbtorture
 TDB_OBJ = common/tdb.o common/dump.o common/io.o common/lock.o \
-	common/open.o common/traverse.o common/freelist.o common/error.o
+	common/open.o common/traverse.o common/freelist.o common/error.o \
+	common/transaction.o
 
 all: $(PROGS)
 
Index: include/tdb.h
===================================================================
--- include/tdb.h	(revision 10253)
+++ include/tdb.h	(working copy)
@@ -45,13 +45,14 @@
 #define TDB_NOMMAP   8 /* don't use mmap */
 #define TDB_CONVERT 16 /* convert endian (internal use) */
 #define TDB_BIGENDIAN 32 /* header is big-endian (internal use) */
+#define TDB_NOSYNC   64 /* don't use synchronous transactions */
 
 #define TDB_ERRCODE(code, ret) ((tdb->ecode = (code)), ret)
 
 /* error codes */
 enum TDB_ERROR {TDB_SUCCESS=0, TDB_ERR_CORRUPT, TDB_ERR_IO, TDB_ERR_LOCK, 
 		TDB_ERR_OOM, TDB_ERR_EXISTS, TDB_ERR_NOLOCK, TDB_ERR_LOCK_TIMEOUT,
-		TDB_ERR_NOEXIST};
+		TDB_ERR_NOEXIST, TDB_ERR_EINVAL};
 
 typedef struct TDB_DATA {
 	unsigned char *dptr;
@@ -59,8 +60,16 @@
 } TDB_DATA;
 
 #ifndef PRINTF_ATTRIBUTE
-#define PRINTF_ATTRIBUTE(a,b)
+#if (__GNUC__ >= 3)
+/** Use gcc attribute to check printf fns.  a1 is the 1-based index of
+ * the parameter containing the format, and a2 the index of the first
+ * argument. Note that some gcc 2.x versions don't handle this
+ * properly **/
+#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
+#else
+#define PRINTF_ATTRIBUTE(a1, a2)
 #endif
+#endif
 
 /* this is the context structure that is returned from a db open */
 typedef struct tdb_context TDB_CONTEXT;
@@ -95,6 +104,10 @@
 const char *tdb_name(struct tdb_context *tdb);
 int tdb_fd(struct tdb_context *tdb);
 tdb_log_func tdb_log_fn(struct tdb_context *tdb);
+int tdb_transaction_start(struct tdb_context *tdb);
+int tdb_transaction_commit(struct tdb_context *tdb);
+int tdb_transaction_cancel(struct tdb_context *tdb);
+int tdb_transaction_recover(struct tdb_context *tdb);
 
 /* Low level locking functions: use with care */
 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key);
Index: include/tdbconfig.h.in
===================================================================
--- include/tdbconfig.h.in	(revision 10253)
+++ include/tdbconfig.h.in	(working copy)
@@ -17,3 +17,7 @@
 
 /* Define if you have the <unistd.h> header file.  */
 #undef HAVE_UNISTD_H
+
+/* Pull in GNU extensions */
+#undef _GNU_SOURCE
+
Index: common/open.c
===================================================================
--- common/open.c	(revision 10253)
+++ common/open.c	(working copy)
@@ -144,6 +144,7 @@
 		errno = ENOMEM;
 		goto fail;
 	}
+	tdb_io_init(tdb);
 	tdb->fd = -1;
 	tdb->name = NULL;
 	tdb->map_ptr = NULL;
@@ -152,6 +153,12 @@
 	tdb->log_fn = log_fn?log_fn:null_log_fn;
 	tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
 
+	/* cache the page size */
+	tdb->page_size = getpagesize();
+	if (tdb->page_size <= 0) {
+		tdb->page_size = 0x2000;
+	}
+
 	if ((open_flags & O_ACCMODE) == O_WRONLY) {
 		TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
 			 name));
@@ -186,7 +193,7 @@
 	}
 
 	/* ensure there is only one process initialising at once */
-	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
+	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
 		TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
 			 name, strerror(errno)));
 		goto fail;	/* errno set by tdb_brlock */
@@ -194,7 +201,7 @@
 
 	/* we need to zero database if we are the only one with it open */
 	if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
-	    (locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
+	    (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
 		open_flags |= O_CREAT;
 		if (ftruncate(tdb->fd, 0) == -1) {
 			TDB_LOG((tdb, 0, "tdb_open_ex: "
@@ -260,7 +267,7 @@
 	}
 	tdb_mmap(tdb);
 	if (locked) {
-		if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
+		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
 			TDB_LOG((tdb, 0, "tdb_open_ex: "
 				 "failed to take ACTIVE_LOCK on %s: %s\n",
 				 name, strerror(errno)));
@@ -275,15 +282,20 @@
 
 	if (tdb_flags & TDB_CLEAR_IF_FIRST) {
 		/* leave this lock in place to indicate it's in use */
-		if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
+		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
 			goto fail;
 	}
 
+	/* if needed, run recovery */
+	if (tdb_transaction_recover(tdb) == -1) {
+		goto fail;
+	}
+
  internal:
 	/* Internal (memory-only) databases skip all the code above to
 	 * do with disk files, and resume here by releasing their
 	 * global lock and hooking into the active list. */
-	if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
+	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
 		goto fail;
 	tdb->next = tdbs;
 	tdbs = tdb;
@@ -322,6 +334,10 @@
 	struct tdb_context **i;
 	int ret = 0;
 
+	if (tdb->transaction) {
+		tdb_transaction_cancel(tdb);
+	}
+
 	if (tdb->map_ptr) {
 		if (tdb->flags & TDB_INTERNAL)
 			SAFE_FREE(tdb->map_ptr);
@@ -360,8 +376,20 @@
 {
 	struct stat st;
 
-	if (tdb->flags & TDB_INTERNAL)
+	if (tdb->flags & TDB_INTERNAL) {
 		return 0; /* Nothing to do. */
+	}
+
+	if (tdb->num_locks != 0) {
+		TDB_LOG((tdb, 0, "tdb_reopen: reopen not allowed with locks held\n"));
+		goto fail;
+	}
+
+	if (tdb->transaction != 0) {
+		TDB_LOG((tdb, 0, "tdb_reopen: reopen not allowed inside a transaction\n"));
+		goto fail;
+	}
+
 	if (tdb_munmap(tdb) != 0) {
 		TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
 		goto fail;
@@ -374,7 +402,7 @@
 		goto fail;
 	}
 	if ((tdb->flags & TDB_CLEAR_IF_FIRST) && 
-	    (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
+	    (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
 		TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
 		goto fail;
 	}
Index: common/dump.c
===================================================================
--- common/dump.c	(revision 10253)
+++ common/dump.c	(working copy)
@@ -33,7 +33,8 @@
 	struct list_struct rec;
 	tdb_off_t tailer_ofs, tailer;
 
-	if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
+	if (tdb->methods->tdb_read(tdb, offset, (char *)&rec, 
+				   sizeof(rec), DOCONV()) == -1) {
 		printf("ERROR: failed to read record at %u\n", offset);
 		return 0;
 	}
@@ -107,7 +108,8 @@
 
 	printf("freelist top=[0x%08x]\n", rec_ptr );
 	while (rec_ptr) {
-		if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
+		if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec, 
+					   sizeof(rec), DOCONV()) == -1) {
 			tdb_unlock(tdb, -1, F_WRLCK);
 			return -1;
 		}
Index: common/io.c
===================================================================
--- common/io.c	(revision 10253)
+++ common/io.c	(working copy)
@@ -56,7 +56,7 @@
    if necessary 
    note that "len" is the minimum length needed for the db
 */
-int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
+static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
 {
 	struct stat st;
 	if (len <= tdb->map_size)
@@ -94,9 +94,10 @@
 }
 
 /* write a lump of data at a specified offset */
-int tdb_write(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len)
+static int tdb_write(struct tdb_context *tdb, tdb_off_t off, 
+		     const void *buf, tdb_len_t len)
 {
-	if (tdb_oob(tdb, off + len, 0) != 0)
+	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
 		return -1;
 
 	if (tdb->map_ptr) {
@@ -122,11 +123,10 @@
 
 
 /* read a lump of data at a specified offset, maybe convert */
-int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len, int cv)
+static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
+		    tdb_len_t len, int cv)
 {
-	ssize_t ret;
-
-	if (tdb_oob(tdb, off + len, 0) != 0) {
+	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
 		return -1;
 	}
 
@@ -142,8 +142,9 @@
 			return TDB_ERRCODE(TDB_ERR_IO, -1);
 		}
 	}
-	if (cv)
+	if (cv) {
 		tdb_convert(buf, len);
+	}
 	return 0;
 }
 
@@ -153,7 +154,7 @@
   do an unlocked scan of the hash table heads to find the next non-zero head. The value
   will then be confirmed with the lock held
 */		
-void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
+static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
 {
 	u32 h = *chain;
 	if (tdb->map_ptr) {
@@ -232,9 +233,10 @@
 		}
 	}
 
-	/* now fill the file with something. This ensures that the file isn't sparse, which would be
-	   very bad if we ran out of disk. This must be done with write, not via mmap */
-	memset(buf, 0x42, sizeof(buf));
+	/* now fill the file with something. This ensures that the
+	   file isn't sparse, which would be very bad if we ran out of
+	   disk. This must be done with write, not via mmap */
+	memset(buf, TDB_PAD_BYTE, sizeof(buf));
 	while (addition) {
 		int n = addition>sizeof(buf)?sizeof(buf):addition;
 		int ret = pwrite(tdb->fd, buf, n, size);
@@ -263,11 +265,11 @@
 	}
 
 	/* must know about any previous expansions by another process */
-	tdb_oob(tdb, tdb->map_size + 1, 1);
+	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
 
 	/* always make room for at least 10 more records, and round
-           the database up to a multiple of TDB_PAGE_SIZE */
-	size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
+           the database up to a multiple of the page size */
+	size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
 
 	if (!(tdb->flags & TDB_INTERNAL))
 		tdb_munmap(tdb);
@@ -280,7 +282,7 @@
 
 	/* expand the file itself */
 	if (!(tdb->flags & TDB_INTERNAL)) {
-		if (tdb_expand_file(tdb, tdb->map_size, size) != 0)
+		if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
 			goto fail;
 	}
 
@@ -323,13 +325,13 @@
 /* read/write a tdb_off_t */
 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
 {
-	return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
+	return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
 }
 
 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
 {
 	tdb_off_t off = *d;
-	return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
+	return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
 }
 
 
@@ -345,7 +347,7 @@
 			   len, strerror(errno)));
 		return TDB_ERRCODE(TDB_ERR_OOM, buf);
 	}
-	if (tdb_read(tdb, offset, buf, len, 0) == -1) {
+	if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
 		SAFE_FREE(buf);
 		return NULL;
 	}
@@ -355,7 +357,7 @@
 /* read/write a record */
 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
 {
-	if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
+	if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
 		return -1;
 	if (TDB_BAD_MAGIC(rec)) {
 		/* Ensure ecode is set for log fn. */
@@ -363,12 +365,28 @@
 		TDB_LOG((tdb, 0,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
 		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
 	}
-	return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
+	return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
 }
 
 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
 {
 	struct list_struct r = *rec;
-	return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
+	return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
 }
 
+static const struct tdb_methods io_methods = {
+	.tdb_read        = tdb_read,
+	.tdb_write       = tdb_write,
+	.next_hash_chain = tdb_next_hash_chain,
+	.tdb_oob         = tdb_oob,
+	.tdb_expand_file = tdb_expand_file,
+	.tdb_brlock      = tdb_brlock
+};
+
+/*
+  initialise the default methods table
+*/
+void tdb_io_init(struct tdb_context *tdb)
+{
+	tdb->methods = &io_methods;
+}
Index: common/lock.c
===================================================================
--- common/lock.c	(revision 10253)
+++ common/lock.c	(working copy)
@@ -32,9 +32,12 @@
    this functions locks/unlocks 1 byte at the specified offset.
 
    On error, errno is also set so that errors are passed back properly
-   through tdb_open(). */
-int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, 
-	       int rw_type, int lck_type, int probe)
+   through tdb_open(). 
+
+   note that a len of zero means lock to end of file
+*/
+int tdb_brlock_len(struct tdb_context *tdb, tdb_off_t offset, 
+		   int rw_type, int lck_type, int probe, size_t len)
 {
 	struct flock fl;
 	int ret;
@@ -49,7 +52,7 @@
 	fl.l_type = rw_type;
 	fl.l_whence = SEEK_SET;
 	fl.l_start = offset;
-	fl.l_len = 1;
+	fl.l_len = len;
 	fl.l_pid = 0;
 
 	do {
@@ -76,6 +79,18 @@
 	return 0;
 }
 
+
+/* a byte range locking function - return 0 on success
+   this functions locks/unlocks 1 byte at the specified offset.
+
+   On error, errno is also set so that errors are passed back properly
+   through tdb_open(). */
+int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, 
+	       int rw_type, int lck_type, int probe)
+{
+	return tdb_brlock_len(tdb, offset, rw_type, lck_type, probe, 1);
+}
+
 /* lock a list in the database. list -1 is the alloc list */
 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
 {
@@ -90,12 +105,13 @@
 	/* Since fcntl locks don't nest, we do a lock for the first one,
 	   and simply bump the count for future ones */
 	if (tdb->locked[list+1].count == 0) {
-		if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
+		if (tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
 			TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
 				 list, ltype, strerror(errno)));
 			return -1;
 		}
 		tdb->locked[list+1].ltype = ltype;
+		tdb->num_locks++;
 	}
 	tdb->locked[list+1].count++;
 	return 0;
@@ -124,7 +140,8 @@
 
 	if (tdb->locked[list+1].count == 1) {
 		/* Down to last nested lock: unlock underneath */
-		ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
+		ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
+		tdb->num_locks--;
 	} else {
 		ret = 0;
 	}
@@ -194,7 +211,7 @@
 /* record lock stops delete underneath */
 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
 {
-	return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
+	return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
 }
 
 /*
@@ -208,7 +225,7 @@
 	for (i = &tdb->travlocks; i; i = i->next)
 		if (i->off == off)
 			return -1;
-	return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
+	return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
 }
 
 /*
@@ -217,7 +234,7 @@
 */
 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
 {
-	return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
+	return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
 }
 
 /* fcntl locks don't stack: avoid unlocking someone else's */
@@ -231,5 +248,5 @@
 	for (i = &tdb->travlocks; i; i = i->next)
 		if (i->off == off)
 			count++;
-	return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
+	return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
 }
Index: common/tdb_private.h
===================================================================
--- common/tdb_private.h	(revision 10253)
+++ common/tdb_private.h	(working copy)
@@ -29,6 +29,8 @@
 #include <config.h>
 #endif
 
+#define _XOPEN_SOURCE 500
+
 #include <stdlib.h>
 #include <stdio.h>
 #include <stdint.h>
@@ -56,32 +58,40 @@
 typedef u32 tdb_len_t;
 typedef u32 tdb_off_t;
 
+#ifndef offsetof
+#define offsetof(t,f) ((unsigned int)&((t *)0)->f)
+#endif
+
 #define TDB_MAGIC_FOOD "TDB file\n"
 #define TDB_VERSION (0x26011967 + 6)
 #define TDB_MAGIC (0x26011999U)
 #define TDB_FREE_MAGIC (~TDB_MAGIC)
 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
+#define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
 #define TDB_ALIGNMENT 4
 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
 #define DEFAULT_HASH_SIZE 131
-#define TDB_PAGE_SIZE 0x2000
 #define FREELIST_TOP (sizeof(struct tdb_header))
 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
+#define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
 #define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
+#define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
+#define TDB_PAD_BYTE 0x42
+#define TDB_PAD_U32  0x42424242
 
-
 /* NB assumes there is a local variable called "tdb" that is the
  * current context, also takes doubly-parenthesized print-style
  * argument. */
 #define TDB_LOG(x) tdb->log_fn x
 
 /* lock offsets */
-#define GLOBAL_LOCK 0
-#define ACTIVE_LOCK 4
+#define GLOBAL_LOCK      0
+#define ACTIVE_LOCK      4
+#define TRANSACTION_LOCK 8
 
 #ifndef MAP_FILE
 #define MAP_FILE 0
@@ -138,8 +148,9 @@
 	char magic_food[32]; /* for /etc/magic */
 	u32 version; /* version of the code */
 	u32 hash_size; /* number of hash entries */
-	tdb_off_t rwlocks;
-	tdb_off_t reserved[31];
+	tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
+	tdb_off_t recovery_start; /* offset of transaction recovery region */
+	tdb_off_t reserved[30];
 };
 
 struct tdb_lock_type {
@@ -154,6 +165,15 @@
 };
 
 
+struct tdb_methods {
+	int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
+	int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
+	void (*next_hash_chain)(struct tdb_context *, u32 *);
+	int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
+	int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
+	int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int);
+};
+
 struct tdb_context {
 	char *name; /* the name of the database */
 	void *map_ptr; /* where it is currently mapped */
@@ -171,7 +191,10 @@
 	void (*log_fn)(struct tdb_context *tdb, int level, const char *, ...) PRINTF_ATTRIBUTE(3,4); /* logging function */
 	unsigned int (*hash_fn)(TDB_DATA *key);
 	int open_flags; /* flags used in the open - needed by reopen */
+	unsigned int num_locks; /* number of chain locks held */
+	const struct tdb_methods *methods;
 	struct tdb_transaction *transaction;
+	int page_size;
 };
 
 
@@ -180,13 +203,11 @@
 */
 int tdb_munmap(struct tdb_context *tdb);
 void tdb_mmap(struct tdb_context *tdb);
-int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len, int cv);
-int tdb_write(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len);
-int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe);
 int tdb_lock(struct tdb_context *tdb, int list, int ltype);
 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
-int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe);
+int tdb_brlock_len(struct tdb_context *tdb, tdb_off_t offset, 
+		   int rw_type, int lck_type, int probe, size_t len);
 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
 int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
@@ -204,5 +225,7 @@
 unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
 			   struct list_struct *rec);
-void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain);
+void tdb_io_init(struct tdb_context *tdb);
+int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
 
+
Index: common/tdb.c
===================================================================
--- common/tdb.c	(revision 10253)
+++ common/tdb.c	(working copy)
@@ -98,7 +98,7 @@
 		return -1;
 	}
 
-	if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
+	if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
 		      dbuf.dptr, dbuf.dsize) == -1)
 		return -1;
 
@@ -285,7 +285,7 @@
 
 	/* write out and point the top of the hash chain at it */
 	if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
-	    || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
+	    || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
 	    || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
 		/* Need to tdb_unallocate() here */
 		goto fail;
Index: common/error.c
===================================================================
--- common/error.c	(revision 10253)
+++ common/error.c	(working copy)
@@ -42,6 +42,7 @@
 	     {TDB_ERR_OOM, "Out of memory"},
 	     {TDB_ERR_EXISTS, "Record exists"},
 	     {TDB_ERR_NOLOCK, "Lock exists on other keys"},
+	     {TDB_ERR_EINVAL, "Invalid parameter"},
 	     {TDB_ERR_NOEXIST, "Record does not exist"} };
 
 /* Error string for the last tdb error */
Index: common/traverse.c
===================================================================
--- common/traverse.c	(revision 10253)
+++ common/traverse.c	(working copy)
@@ -65,7 +65,7 @@
 			   factor of around 80 in speed on a linux 2.6.x
 			   system (testing using ldbtest).
 			*/
-			tdb_next_hash_chain(tdb, &tlock->hash);
+			tdb->methods->next_hash_chain(tdb, &tlock->hash);
 			if (tlock->hash == tdb->header.hash_size) {
 				continue;
 			}
Index: common/freelist.c
===================================================================
--- common/freelist.c	(revision 10253)
+++ common/freelist.c	(working copy)
@@ -31,7 +31,7 @@
 /* read a freelist record and check for simple errors */
 static int rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
 {
-	if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
+	if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
 		return -1;
 
 	if (rec->magic == TDB_MAGIC) {
@@ -40,7 +40,7 @@
 		TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
 			 rec->magic, off));
 		rec->magic = TDB_FREE_MAGIC;
-		if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
+		if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
 			return -1;
 	}
 
@@ -51,7 +51,7 @@
 			   rec->magic, off));
 		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
 	}
-	if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
+	if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
 		return -1;
 	return 0;
 }
@@ -111,7 +111,7 @@
 	if (right + sizeof(*rec) <= tdb->map_size) {
 		struct list_struct r;
 
-		if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
+		if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
 			TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
 			goto left;
 		}
@@ -138,10 +138,16 @@
 			TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
 			goto update;
 		}
+
+		/* it could be uninitialised data */
+		if (leftsize == 0 || leftsize == TDB_PAD_U32) {
+			goto update;
+		}
+
 		left = offset - leftsize;
 
 		/* Now read in record */
-		if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
+		if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
 			TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
 			goto update;
 		}




More information about the samba-technical mailing list