[PATCH] lib/tdb: TDB_TRACE support (for developers)

Rusty Russell rusty at rustcorp.com.au
Thu Aug 27 20:26:01 MDT 2009


When TDB_TRACE is defined (in tdb_private.h), verbose tracing of tdb operations is enabled.
This can be replayed using "replay_trace" from http://ccan.ozlabs.org/info/tdb.

The majority of this patch comes from moving internal functions to _<funcname> to
avoid double-tracing.  There should be no additional overhead for the normal (!TDB_TRACE)
case.

Note that the verbose traces compress really well with rzip.

Signed-off-by: Rusty Russell <rusty at rustcorp.com.au>
---
 lib/tdb/common/lock.c        |   34 +++++-
 lib/tdb/common/open.c        |   32 ++++++-
 lib/tdb/common/tdb.c         |  230 +++++++++++++++++++++++++++++++++++++----
 lib/tdb/common/tdb_private.h |   37 +++++++
 lib/tdb/common/transaction.c |   66 +++++++++----
 lib/tdb/common/traverse.c    |   23 ++++-
 6 files changed, 366 insertions(+), 56 deletions(-)

diff --git a/lib/tdb/common/lock.c b/lib/tdb/common/lock.c
index 2c72ae1..3414049 100644
--- a/lib/tdb/common/lock.c
+++ b/lib/tdb/common/lock.c
@@ -422,48 +422,58 @@ static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
 /* lock entire database with write lock */
 int tdb_lockall(struct tdb_context *tdb)
 {
+	tdb_trace(tdb, "tdb_lockall");
 	return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
 }
 
 /* lock entire database with write lock - mark only */
 int tdb_lockall_mark(struct tdb_context *tdb)
 {
+	tdb_trace(tdb, "tdb_lockall_mark");
 	return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
 }
 
 /* unlock entire database with write lock - unmark only */
 int tdb_lockall_unmark(struct tdb_context *tdb)
 {
+	tdb_trace(tdb, "tdb_lockall_unmark");
 	return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
 }
 
 /* lock entire database with write lock - nonblocking varient */
 int tdb_lockall_nonblock(struct tdb_context *tdb)
 {
-	return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
+	int ret = _tdb_lockall(tdb, F_WRLCK, F_SETLK);
+	tdb_trace_ret(tdb, "tdb_lockall_nonblock", ret);
+	return ret;
 }
 
 /* unlock entire database with write lock */
 int tdb_unlockall(struct tdb_context *tdb)
 {
+	tdb_trace(tdb, "tdb_unlockall");
 	return _tdb_unlockall(tdb, F_WRLCK);
 }
 
 /* lock entire database with read lock */
 int tdb_lockall_read(struct tdb_context *tdb)
 {
+	tdb_trace(tdb, "tdb_lockall_read");
 	return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
 }
 
 /* lock entire database with read lock - nonblock varient */
 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
 {
-	return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
+	int ret = _tdb_lockall(tdb, F_RDLCK, F_SETLK);
+	tdb_trace_ret(tdb, "tdb_lockall_read_nonblock", ret);
+	return ret;
 }
 
 /* unlock entire database with read lock */
 int tdb_unlockall_read(struct tdb_context *tdb)
 {
+	tdb_trace(tdb, "tdb_unlockall_read");
 	return _tdb_unlockall(tdb, F_RDLCK);
 }
 
@@ -471,7 +481,9 @@ int tdb_unlockall_read(struct tdb_context *tdb)
    contention - it cannot guarantee how many records will be locked */
 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
 {
-	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
+	int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
+	tdb_trace_1rec(tdb, "tdb_chainlock", key);
+	return ret;
 }
 
 /* lock/unlock one hash chain, non-blocking. This is meant to be used
@@ -479,33 +491,43 @@ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
    locked */
 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
 {
-	return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
+	int ret = tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
+	tdb_trace_1rec_ret(tdb, "tdb_chainlock_nonblock", key, ret);
+	return ret;
 }
 
 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
 {
-	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
+	int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
+	tdb_trace_1rec(tdb, "tdb_chainlock_mark", key);
+	return ret;
 }
 
 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
 {
+	tdb_trace_1rec(tdb, "tdb_chainlock_unmark", key);
 	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
 }
 
 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
 {
+	tdb_trace_1rec(tdb, "tdb_chainunlock", key);
 	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
 }
 
 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
 {
-	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
+	int ret;
+	ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
+	tdb_trace_1rec(tdb, "tdb_chainlock_read", key);
+	return ret;
 }
 
 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
 {
+	tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
 	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
 }
 
diff --git a/lib/tdb/common/open.c b/lib/tdb/common/open.c
index 2e6a707..ab40a4d 100644
--- a/lib/tdb/common/open.c
+++ b/lib/tdb/common/open.c
@@ -164,7 +164,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
 	tdb->name = NULL;
 	tdb->map_ptr = NULL;
 	tdb->flags = tdb_flags;
-	tdb->open_flags = open_flags;
+	tdb->open_flags = (open_flags|TDB_NOSYNC);
 	if (log_ctx) {
 		tdb->log = *log_ctx;
 	} else {
@@ -205,6 +205,10 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
 			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
 			goto fail;
 		}
+#ifdef TDB_TRACE
+		/* All tracing will fail.  That's ok. */
+		tdb->tracefd = -1;
+#endif
 		goto internal;
 	}
 
@@ -313,6 +317,21 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
 		goto fail;
 	}
 
+#ifdef TDB_TRACE
+	{
+		char tracefile[strlen(name) + 32];
+
+		sprintf(tracefile, "%s.trace.%u", name, getpid());
+		tdb->tracefd = open(tracefile, O_WRONLY|O_CREAT|O_EXCL, 0600);
+		if (tdb->tracefd >= 0) {
+			tdb_enable_seqnum(tdb);
+			tdb_trace_open(tdb, "tdb_open", hash_size, tdb_flags,
+				       open_flags);
+		} else
+			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to open trace file %s!\n", tracefile));
+	}
+#endif
+
  internal:
 	/* Internal (memory-only) databases skip all the code above to
 	 * do with disk files, and resume here by releasing their
@@ -328,7 +347,10 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
 
 	if (!tdb)
 		return NULL;
-	
+
+#ifdef TDB_TRACE
+	close(tdb->tracefd);
+#endif
 	if (tdb->map_ptr) {
 		if (tdb->flags & TDB_INTERNAL)
 			SAFE_FREE(tdb->map_ptr);
@@ -364,8 +386,9 @@ int tdb_close(struct tdb_context *tdb)
 	struct tdb_context **i;
 	int ret = 0;
 
+	tdb_trace(tdb, "tdb_close");
 	if (tdb->transaction) {
-		tdb_transaction_cancel(tdb);
+		_tdb_transaction_cancel(tdb);
 	}
 
 	if (tdb->map_ptr) {
@@ -387,6 +410,9 @@ int tdb_close(struct tdb_context *tdb)
 		}
 	}
 
+#ifdef TDB_TRACE
+	close(tdb->tracefd);
+#endif
 	memset(tdb, 0, sizeof(*tdb));
 	SAFE_FREE(tdb);
 
diff --git a/lib/tdb/common/tdb.c b/lib/tdb/common/tdb.c
index b78f74c..dbb5c77 100644
--- a/lib/tdb/common/tdb.c
+++ b/lib/tdb/common/tdb.c
@@ -158,7 +158,7 @@ static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
  * then the TDB_DATA will have zero length but
  * a non-zero pointer
  */
-TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
+static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
 {
 	tdb_off_t rec_ptr;
 	struct list_struct rec;
@@ -177,6 +177,14 @@ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
 	return ret;
 }
 
+TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
+{
+	TDB_DATA ret = _tdb_fetch(tdb, key);
+
+	tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
+	return ret;
+}
+
 /*
  * Find an entry in the database and hand the record's data to a parsing
  * function. The parsing function is executed under the chain read lock, so it
@@ -207,8 +215,10 @@ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
 	hash = tdb->hash_fn(&key);
 
 	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
+		tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
 		return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
 	}
+	tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
 
 	ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
 			     rec.data_len, parser, private_data);
@@ -237,7 +247,11 @@ static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
 {
 	uint32_t hash = tdb->hash_fn(&key);
-	return tdb_exists_hash(tdb, key, hash);
+	int ret;
+
+	ret = tdb_exists_hash(tdb, key, hash);
+	tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
+	return ret;
 }
 
 /* actually delete an entry in the database given the offset */
@@ -392,7 +406,11 @@ static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
 {
 	uint32_t hash = tdb->hash_fn(&key);
-	return tdb_delete_hash(tdb, key, hash);
+	int ret;
+
+	ret = tdb_delete_hash(tdb, key, hash);
+	tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
+	return ret;
 }
 
 /*
@@ -424,29 +442,14 @@ static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
 	return 0;
 }
 
-/* store an element in the database, replacing any existing element
-   with the same key 
-
-   return 0 on success, -1 on failure
-*/
-int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
+static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
+		       TDB_DATA dbuf, int flag, uint32_t hash)
 {
 	struct list_struct rec;
-	uint32_t hash;
 	tdb_off_t rec_ptr;
 	char *p = NULL;
 	int ret = -1;
 
-	if (tdb->read_only || tdb->traverse_read) {
-		tdb->ecode = TDB_ERR_RDONLY;
-		return -1;
-	}
-
-	/* find which hash bucket it is in */
-	hash = tdb->hash_fn(&key);
-	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
-		return -1;
-
 	/* check for it existing, on insert. */
 	if (flag == TDB_INSERT) {
 		if (tdb_exists_hash(tdb, key, hash)) {
@@ -562,10 +565,35 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
 	}
 
 	SAFE_FREE(p); 
-	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
 	return ret;
 }
 
+/* store an element in the database, replacing any existing element
+   with the same key 
+
+   return 0 on success, -1 on failure
+*/
+int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
+{
+	uint32_t hash;
+	int ret;
+
+	if (tdb->read_only || tdb->traverse_read) {
+		tdb->ecode = TDB_ERR_RDONLY;
+		tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
+		return -1;
+	}
+
+	/* find which hash bucket it is in */
+	hash = tdb->hash_fn(&key);
+	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
+		return -1;
+
+	ret = _tdb_store(tdb, key, dbuf, flag, hash);
+	tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
+	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
+	return ret;
+}
 
 /* Append to an entry. Create if not exist. */
 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
@@ -579,7 +607,7 @@ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
 	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
 		return -1;
 
-	dbuf = tdb_fetch(tdb, key);
+	dbuf = _tdb_fetch(tdb, key);
 
 	if (dbuf.dptr == NULL) {
 		dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
@@ -605,7 +633,8 @@ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
 	memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
 	dbuf.dsize += new_dbuf.dsize;
 
-	ret = tdb_store(tdb, key, dbuf, 0);
+	ret = _tdb_store(tdb, key, dbuf, 0, hash);
+	tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
 	
 failed:
 	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
@@ -739,6 +768,8 @@ int tdb_wipe_all(struct tdb_context *tdb)
 		return -1;
 	}
 
+	tdb_trace(tdb, "tdb_wipe_all");
+
 	/* see if the tdb has a recovery area, and remember its size
 	   if so. We don't want to lose this as otherwise each
 	   tdb_wipe_all() in a transaction will increase the size of
@@ -837,6 +868,8 @@ int tdb_repack(struct tdb_context *tdb)
 	struct tdb_context *tmp_db;
 	struct traverse_state state;
 
+	tdb_trace(tdb, "tdb_repack");
+
 	if (tdb_transaction_start(tdb) != 0) {
 		TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
 		return -1;
@@ -899,3 +932,154 @@ int tdb_repack(struct tdb_context *tdb)
 
 	return 0;
 }
+
+#ifdef TDB_TRACE
+static void tdb_trace_write(struct tdb_context *tdb, const char *str)
+{
+	if (write(tdb->tracefd, str, strlen(str)) != strlen(str)) {
+		close(tdb->tracefd);
+		tdb->tracefd = -1;
+	}
+}
+
+static void tdb_trace_start(struct tdb_context *tdb)
+{
+	tdb_off_t seqnum=0;
+	char msg[sizeof(tdb_off_t) * 4];
+
+	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
+	sprintf(msg, "%u ", seqnum);
+	tdb_trace_write(tdb, msg);
+}
+
+static void tdb_trace_end(struct tdb_context *tdb)
+{
+	tdb_trace_write(tdb, "\n");
+}
+
+static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
+{
+	char msg[sizeof(ret) * 4];
+	sprintf(msg, " = %i\n", ret);
+	tdb_trace_write(tdb, msg);
+}
+
+static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
+{
+	char msg[20 + rec.dsize*2], *p;
+	unsigned int i;
+
+	/* We differentiate zero-length records from non-existent ones. */
+	if (rec.dptr == NULL) {
+		tdb_trace_write(tdb, " NULL");
+		return;
+	}
+
+	p = msg;
+	p += sprintf(p, " %zu:", rec.dsize);
+	for (i = 0; i < rec.dsize; i++)
+		p += sprintf(p, "%02x", rec.dptr[i]);
+
+	tdb_trace_write(tdb, msg);
+}
+
+void tdb_trace(struct tdb_context *tdb, const char *op)
+{
+	tdb_trace_start(tdb);
+	tdb_trace_write(tdb, op);
+	tdb_trace_end(tdb);
+}
+
+void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
+{
+	char msg[sizeof(tdb_off_t) * 4];
+
+	sprintf(msg, "%u ", seqnum);
+	tdb_trace_write(tdb, msg);
+	tdb_trace_write(tdb, op);
+	tdb_trace_end(tdb);
+}
+
+void tdb_trace_open(struct tdb_context *tdb, const char *op,
+		    unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
+{
+	char msg[128];
+
+	sprintf(msg, "%s %u %#x %#x", op, hash_size, tdb_flags, open_flags);
+	tdb_trace_start(tdb);
+	tdb_trace_write(tdb, msg);
+	tdb_trace_end(tdb);
+}
+
+void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
+{
+	tdb_trace_start(tdb);
+	tdb_trace_write(tdb, op);
+	tdb_trace_end_ret(tdb, ret);
+}
+
+void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
+{
+	tdb_trace_start(tdb);
+	tdb_trace_write(tdb, op);
+	tdb_trace_write(tdb, " =");
+	tdb_trace_record(tdb, ret);
+	tdb_trace_end(tdb);
+}
+
+void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
+		    TDB_DATA rec)
+{
+	tdb_trace_start(tdb);
+	tdb_trace_write(tdb, op);
+	tdb_trace_record(tdb, rec);
+	tdb_trace_end(tdb);
+}
+
+void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
+			TDB_DATA rec, int ret)
+{
+	tdb_trace_start(tdb);
+	tdb_trace_write(tdb, op);
+	tdb_trace_record(tdb, rec);
+	tdb_trace_end_ret(tdb, ret);
+}
+
+void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
+			   TDB_DATA rec, TDB_DATA ret)
+{
+	tdb_trace_start(tdb);
+	tdb_trace_write(tdb, op);
+	tdb_trace_record(tdb, rec);
+	tdb_trace_write(tdb, " =");
+	tdb_trace_record(tdb, ret);
+	tdb_trace_end(tdb);
+}
+
+void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
+			     TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
+			     int ret)
+{
+	char msg[sizeof(ret) * 4];
+
+	sprintf(msg, " %#x", flag); 
+	tdb_trace_start(tdb);
+	tdb_trace_write(tdb, op);
+	tdb_trace_record(tdb, rec1);
+	tdb_trace_record(tdb, rec2);
+	tdb_trace_write(tdb, msg);
+	tdb_trace_end_ret(tdb, ret);
+}
+
+void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
+			   TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
+{
+	tdb_trace_start(tdb);
+	tdb_trace_write(tdb, op);
+	tdb_trace_record(tdb, rec1);
+	tdb_trace_record(tdb, rec2);
+	tdb_trace_write(tdb, " =");
+	tdb_trace_record(tdb, ret);
+	tdb_trace_end(tdb);
+}
+#endif
diff --git a/lib/tdb/common/tdb_private.h b/lib/tdb/common/tdb_private.h
index 45b85f4..cc3bbd5 100644
--- a/lib/tdb/common/tdb_private.h
+++ b/lib/tdb/common/tdb_private.h
@@ -31,6 +31,7 @@
 #include "system/wait.h"
 #include "tdb.h"
 
+/* #define TDB_TRACE 1 */
 #ifndef HAVE_GETPAGESIZE
 #define getpagesize() 0x2000
 #endif
@@ -68,6 +69,37 @@ typedef uint32_t tdb_off_t;
  * argument. */
 #define TDB_LOG(x) tdb->log.log_fn x
 
+#ifdef TDB_TRACE
+void tdb_trace(struct tdb_context *tdb, const char *op);
+void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op);
+void tdb_trace_open(struct tdb_context *tdb, const char *op,
+		    unsigned hash_size, unsigned tdb_flags, unsigned open_flags);
+void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret);
+void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret);
+void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
+		    TDB_DATA rec);
+void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
+			TDB_DATA rec, int ret);
+void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
+			   TDB_DATA rec, TDB_DATA ret);
+void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
+			     TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
+			     int ret);
+void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
+			   TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret);
+#else
+#define tdb_trace(tdb, op)
+#define tdb_trace_seqnum(tdb, seqnum, op)
+#define tdb_trace_open(tdb, op, hash_size, tdb_flags, open_flags)
+#define tdb_trace_ret(tdb, op, ret)
+#define tdb_trace_retrec(tdb, op, ret)
+#define tdb_trace_1rec(tdb, op, rec)
+#define tdb_trace_1rec_ret(tdb, op, rec, ret)
+#define tdb_trace_1rec_retrec(tdb, op, rec, ret)
+#define tdb_trace_2rec_flag_ret(tdb, op, rec1, rec2, flag, ret)
+#define tdb_trace_2rec_retrec(tdb, op, rec1, rec2, ret)
+#endif /* !TDB_TRACE */
+
 /* lock offsets */
 #define GLOBAL_LOCK      0
 #define ACTIVE_LOCK      4
@@ -167,6 +199,10 @@ struct tdb_context {
 	int page_size;
 	int max_dead_records;
 	int transaction_lock_count;
+#ifdef TDB_TRACE
+	int tracefd;
+	uint32_t transaction_prepare_seqnum;
+#endif
 	volatile sig_atomic_t *interrupt_sig_ptr;
 };
 
@@ -194,6 +230,7 @@ int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
+int _tdb_transaction_cancel(struct tdb_context *tdb);
 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
diff --git a/lib/tdb/common/transaction.c b/lib/tdb/common/transaction.c
index e97fe67..2302a72 100644
--- a/lib/tdb/common/transaction.c
+++ b/lib/tdb/common/transaction.c
@@ -510,6 +510,8 @@ int tdb_transaction_start(struct tdb_context *tdb)
 	tdb->transaction->io_methods = tdb->methods;
 	tdb->methods = &transaction_methods;
 
+	/* Trace at the end, so we get sequence number correct. */
+	tdb_trace(tdb, "tdb_transaction_start");
 	return 0;
 	
 fail:
@@ -552,10 +554,7 @@ static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t
 }
 
 
-/*
-  cancel the current transaction
-*/
-int tdb_transaction_cancel(struct tdb_context *tdb)
+int _tdb_transaction_cancel(struct tdb_context *tdb)
 {	
 	int i, ret = 0;
 
@@ -620,6 +619,14 @@ int tdb_transaction_cancel(struct tdb_context *tdb)
 	return ret;
 }
 
+/*
+  cancel the current transaction
+*/
+int tdb_transaction_cancel(struct tdb_context *tdb)
+{
+	tdb_trace(tdb, "tdb_transaction_cancel");
+	return _tdb_transaction_cancel(tdb);
+}
 
 /*
   work out how much space the linearised recovery data will consume
@@ -867,10 +874,7 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
 	return 0;
 }
 
-/*
-  prepare to commit the current transaction
-*/
-int tdb_transaction_prepare_commit(struct tdb_context *tdb)
+static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
 {	
 	const struct tdb_methods *methods;
 
@@ -881,14 +885,14 @@ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
 
 	if (tdb->transaction->prepared) {
 		tdb->ecode = TDB_ERR_EINVAL;
-		tdb_transaction_cancel(tdb);
+		_tdb_transaction_cancel(tdb);
 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
 		return -1;
 	}
 
 	if (tdb->transaction->transaction_error) {
 		tdb->ecode = TDB_ERR_IO;
-		tdb_transaction_cancel(tdb);
+		_tdb_transaction_cancel(tdb);
 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
 		return -1;
 	}
@@ -898,6 +902,11 @@ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
 		return 0;
 	}		
 
+#ifdef TDB_TRACE
+	/* store seqnum now, before reading becomes illegal. */
+	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &tdb->transaction_prepare_seqnum);
+#endif
+
 	/* check for a null transaction */
 	if (tdb->transaction->blocks == NULL) {
 		return 0;
@@ -910,7 +919,7 @@ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
 	if (tdb->num_locks || tdb->global_lock.count) {
 		tdb->ecode = TDB_ERR_LOCK;
 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
-		tdb_transaction_cancel(tdb);
+		_tdb_transaction_cancel(tdb);
 		return -1;
 	}
 
@@ -918,7 +927,7 @@ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
 	if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
 		tdb->ecode = TDB_ERR_LOCK;
-		tdb_transaction_cancel(tdb);
+		_tdb_transaction_cancel(tdb);
 		return -1;
 	}
 
@@ -927,7 +936,7 @@ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
 	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
 		tdb->ecode = TDB_ERR_LOCK;
-		tdb_transaction_cancel(tdb);
+		_tdb_transaction_cancel(tdb);
 		return -1;
 	}
 
@@ -936,7 +945,7 @@ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
 		if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
-			tdb_transaction_cancel(tdb);
+			_tdb_transaction_cancel(tdb);
 			return -1;
 		}
 	}
@@ -951,7 +960,7 @@ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
 			tdb->ecode = TDB_ERR_IO;
 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
-			tdb_transaction_cancel(tdb);
+			_tdb_transaction_cancel(tdb);
 			return -1;
 		}
 		tdb->map_size = tdb->transaction->old_map_size;
@@ -964,6 +973,15 @@ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
 }
 
 /*
+   prepare to commit the current transaction
+*/
+int tdb_transaction_prepare_commit(struct tdb_context *tdb)
+{	
+	tdb_trace(tdb, "tdb_transaction_prepare_commit");
+	return _tdb_transaction_prepare_commit(tdb);
+}
+
+/*
   commit the current transaction
 */
 int tdb_transaction_commit(struct tdb_context *tdb)
@@ -977,9 +995,17 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 		return -1;
 	}
 
+	/* If we've prepared, can't read seqnum. */
+	if (tdb->transaction->prepared) {
+		tdb_trace_seqnum(tdb, tdb->transaction_prepare_seqnum,
+				 "tdb_transaction_commit");
+	} else {
+		tdb_trace(tdb, "tdb_transaction_commit");
+	}
+
 	if (tdb->transaction->transaction_error) {
 		tdb->ecode = TDB_ERR_IO;
-		tdb_transaction_cancel(tdb);
+		_tdb_transaction_cancel(tdb);
 		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
 		return -1;
 	}
@@ -992,12 +1018,12 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
 	/* check for a null transaction */
 	if (tdb->transaction->blocks == NULL) {
-		tdb_transaction_cancel(tdb);
+		_tdb_transaction_cancel(tdb);
 		return 0;
 	}
 
 	if (!tdb->transaction->prepared) {
-		int ret = tdb_transaction_prepare_commit(tdb);
+		int ret = _tdb_transaction_prepare_commit(tdb);
 		if (ret)
 			return ret;
 	}
@@ -1028,7 +1054,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 			tdb->methods = methods;
 			tdb_transaction_recover(tdb); 
 
-			tdb_transaction_cancel(tdb);
+			_tdb_transaction_cancel(tdb);
 			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
@@ -1066,7 +1092,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
 	/* use a transaction cancel to free memory and remove the
 	   transaction locks */
-	tdb_transaction_cancel(tdb);
+	_tdb_transaction_cancel(tdb);
 
 	if (need_repack) {
 		return tdb_repack(tdb);
diff --git a/lib/tdb/common/traverse.c b/lib/tdb/common/traverse.c
index 8e5a63a..3c88c87 100644
--- a/lib/tdb/common/traverse.c
+++ b/lib/tdb/common/traverse.c
@@ -178,6 +178,8 @@ static int tdb_traverse_internal(struct tdb_context *tdb,
 		dbuf.dptr = key.dptr + rec.key_len;
 		dbuf.dsize = rec.data_len;
 
+		tdb_trace_1rec_retrec(tdb, "traverse", key, dbuf);
+
 		/* Drop chain lock, call out */
 		if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
 			ret = -1;
@@ -186,6 +188,7 @@ static int tdb_traverse_internal(struct tdb_context *tdb,
 		}
 		if (fn && fn(tdb, key, dbuf, private_data)) {
 			/* They want us to terminate traversal */
+			tdb_trace_ret(tdb, "tdb_traverse_end", count);
 			if (tdb_unlock_record(tdb, tl->off) != 0) {
 				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
 				ret = -1;
@@ -195,6 +198,7 @@ static int tdb_traverse_internal(struct tdb_context *tdb,
 		}
 		SAFE_FREE(key.dptr);
 	}
+	tdb_trace(tdb, "tdb_traverse_end");
 out:
 	tdb->travlocks.next = tl->next;
 	if (ret < 0)
@@ -220,6 +224,7 @@ int tdb_traverse_read(struct tdb_context *tdb,
 	}
 
 	tdb->traverse_read++;
+	tdb_trace(tdb, "tdb_traverse_read_start");
 	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
 	tdb->traverse_read--;
 
@@ -244,12 +249,13 @@ int tdb_traverse(struct tdb_context *tdb,
 	if (tdb->read_only || tdb->traverse_read) {
 		return tdb_traverse_read(tdb, fn, private_data);
 	}
-	
+
 	if (tdb_transaction_lock(tdb, F_WRLCK)) {
 		return -1;
 	}
 
 	tdb->traverse_write++;
+	tdb_trace(tdb, "tdb_traverse_start");
 	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
 	tdb->traverse_write--;
 
@@ -273,13 +279,17 @@ TDB_DATA tdb_firstkey(struct tdb_context *tdb)
 	tdb->travlocks.lock_rw = F_RDLCK;
 
 	/* Grab first record: locks chain and returned record. */
- 	off = tdb_next_lock(tdb, &tdb->travlocks, &rec);
- 	if (off == 0 || off == TDB_NEXT_LOCK_ERR)
+	off = tdb_next_lock(tdb, &tdb->travlocks, &rec);
+	if (off == 0 || off == TDB_NEXT_LOCK_ERR) {
+		tdb_trace_retrec(tdb, "tdb_firstkey", tdb_null);
 		return tdb_null;
+	}
 	/* now read the key */
 	key.dsize = rec.key_len;
 	key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
 
+	tdb_trace_retrec(tdb, "tdb_firstkey", key);
+
 	/* Unlock the hash chain of the record we just read. */
 	if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
@@ -305,6 +315,8 @@ TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
 		    || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
 			/* No, it wasn't: unlock it and start from scratch */
 			if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
+				tdb_trace_1rec_retrec(tdb, "tdb_nextkey",
+						      oldkey, tdb_null);
 				SAFE_FREE(k);
 				return tdb_null;
 			}
@@ -321,8 +333,10 @@ TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
 	if (!tdb->travlocks.off) {
 		/* No previous element: do normal find, and lock record */
 		tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
-		if (!tdb->travlocks.off)
+		if (!tdb->travlocks.off) {
+			tdb_trace_1rec_retrec(tdb, "tdb_nextkey", oldkey, tdb_null);
 			return tdb_null;
+		}
 		tdb->travlocks.hash = BUCKET(rec.full_hash);
 		if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
 			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
@@ -345,6 +359,7 @@ TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
 	/* Unlock the chain of old record */
 	if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
 		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
+	tdb_trace_1rec_retrec(tdb, "tdb_nextkey", oldkey, key);
 	return key;
 }
 
-- 
1.6.0.4




More information about the samba-technical mailing list