[PATCH] SMB2_FIND improvement for clustered environments

Ralph Böhme slow at samba.org
Tue Apr 18 12:30:19 UTC 2017


On Fri, Mar 31, 2017 at 08:05:18PM +0200, Ralph Böhme wrote:
> On Fri, Mar 31, 2017 at 10:52:27AM -0700, Jeremy Allison wrote:
> > On Thu, Mar 30, 2017 at 08:14:41AM +0200, Volker Lendecke via samba-technical wrote:
> > > On Wed, Mar 29, 2017 at 01:54:27PM -0700, Jeremy Allison via samba-technical wrote:
> > > > I think that's horribly complicated :-). Can we just turn
> > > > it on by default, and have an (undocumented) parametric
> > > > option to turn if off that we can remove once we're
> > > > confident it works correctly ?
> > > > 
> > > > Don't want to drop this on the floor :-).
> > > 
> > > Always turn it on please. The existing search ask sharemode option is
> > > a functionality change that should stay as yes/no option, but if we're
> > > in clustered mode we should not do the rabbit pellet mode.
> > 
> > Yeah, fair enough. I'm happy for it to be "always on".
> > We need to be confident enough in the changes to do this
> > anyway.
> 
> ok. Will post an updated patchset. Thanks!

attached. No more "async search ask sharemode" option, it's now always on and
there's no way back. The old option "search ask sharemode" is still there of
course.

Thanks!
-slow
-------------- next part --------------
From bda6350566780adb9039a18036f7deb3cb75ae3d Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Wed, 22 Feb 2017 17:21:15 +0100
Subject: [PATCH 01/16] lib/util: add and use iov_concat

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 lib/util/iov_buf.c        | 21 +++++++++++++++++++++
 lib/util/iov_buf.h        |  6 +++---
 lib/util/wscript_build    |  1 +
 libcli/smb/smbXcli_base.c | 26 ++------------------------
 4 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/lib/util/iov_buf.c b/lib/util/iov_buf.c
index d260b2f..592bc5d 100644
--- a/lib/util/iov_buf.c
+++ b/lib/util/iov_buf.c
@@ -20,6 +20,7 @@
 #include "replace.h"
 #include "system/filesys.h"
 #include "iov_buf.h"
+#include <talloc.h>
 
 ssize_t iov_buflen(const struct iovec *iov, int iovcnt)
 {
@@ -90,3 +91,23 @@ bool iov_advance(struct iovec **iov, int *iovcnt, size_t n)
 	*iovcnt = cnt;
 	return true;
 }
+
+uint8_t *iov_concat(TALLOC_CTX *mem_ctx, const struct iovec *iov, int count)
+{
+	ssize_t buflen;
+	uint8_t *buf;
+
+	buflen = iov_buflen(iov, count);
+	if (buflen == -1) {
+		return NULL;
+	}
+
+	buf = talloc_array(mem_ctx, uint8_t, buflen);
+	if (buf == NULL) {
+		return NULL;
+	}
+
+	iov_buf(iov, count, buf, buflen);
+
+	return buf;
+}
diff --git a/lib/util/iov_buf.h b/lib/util/iov_buf.h
index 8f0ca26..79b81b8 100644
--- a/lib/util/iov_buf.h
+++ b/lib/util/iov_buf.h
@@ -20,13 +20,13 @@
 #ifndef __LIB_IOV_BUF_H__
 #define __LIB_IOV_BUF_H__
 
-#include <unistd.h>
-#include <stdint.h>
-#include <stdbool.h>
+#include "replace.h"
+#include <talloc.h>
 
 ssize_t iov_buflen(const struct iovec *iov, int iovlen);
 ssize_t iov_buf(const struct iovec *iov, int iovcnt,
 		uint8_t *buf, size_t buflen);
 bool iov_advance(struct iovec **iov, int *iovcnt, size_t n);
+uint8_t *iov_concat(TALLOC_CTX *mem_ctx, const struct iovec *iov, int count);
 
 #endif
diff --git a/lib/util/wscript_build b/lib/util/wscript_build
index 91505eb..d093947 100644
--- a/lib/util/wscript_build
+++ b/lib/util/wscript_build
@@ -78,6 +78,7 @@ bld.SAMBA_SUBSYSTEM('samba-util-core',
 
 bld.SAMBA_LIBRARY('iov_buf',
                   source='iov_buf.c',
+                  deps='talloc',
                   local_include=False,
                   private_library=True)
 
diff --git a/libcli/smb/smbXcli_base.c b/libcli/smb/smbXcli_base.c
index 1fcf11e..1ec11a9 100644
--- a/libcli/smb/smbXcli_base.c
+++ b/libcli/smb/smbXcli_base.c
@@ -1343,28 +1343,6 @@ static size_t smbXcli_iov_len(const struct iovec *iov, int count)
 	return ret;
 }
 
-static uint8_t *smbXcli_iov_concat(TALLOC_CTX *mem_ctx,
-				   const struct iovec *iov,
-				   int count)
-{
-	ssize_t buflen;
-	uint8_t *buf;
-
-	buflen = iov_buflen(iov, count);
-	if (buflen == -1) {
-		return NULL;
-	}
-
-	buf = talloc_array(mem_ctx, uint8_t, buflen);
-	if (buf == NULL) {
-		return NULL;
-	}
-
-	iov_buf(iov, count, buf, buflen);
-
-	return buf;
-}
-
 static void smb1cli_req_flags(enum protocol_types protocol,
 			      uint32_t smb1_capabilities,
 			      uint8_t smb_command,
@@ -1647,7 +1625,7 @@ static NTSTATUS smb1cli_conn_signv(struct smbXcli_conn *conn,
 
 	frame = talloc_stackframe();
 
-	buf = smbXcli_iov_concat(frame, &iov[1], iov_count - 1);
+	buf = iov_concat(frame, &iov[1], iov_count - 1);
 	if (buf == NULL) {
 		return NT_STATUS_NO_MEMORY;
 	}
@@ -1739,7 +1717,7 @@ static NTSTATUS smb1cli_req_writev_submit(struct tevent_req *req,
 	if (common_encryption_on(state->conn->smb1.trans_enc)) {
 		char *buf, *enc_buf;
 
-		buf = (char *)smbXcli_iov_concat(talloc_tos(), iov, iov_count);
+		buf = (char *)iov_concat(talloc_tos(), iov, iov_count);
 		if (buf == NULL) {
 			return NT_STATUS_NO_MEMORY;
 		}
-- 
2.9.3


From d6bc73034faad62f40d29a68351bcf5a550ae8a6 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Tue, 10 Jan 2017 14:22:21 +0100
Subject: [PATCH 02/16] dbwrap: add enum dbwrap_req_state

This will be used by async dwrap_parse_send() as an out argument, giving
the caller an indication about the state of the request.

This is can be useful for the caller if it is a sync function and sends
multiple async dbwrap requests. As it's a sync function it won't return
to the main tevent event loop and so the async dbwrap recv function are
not called.

As a result the function may deadlock: our receive queue may already be
full with results from a peer, the peer might be blocked in his send
queue (because we're not receiving), the peer therefor doesn't read from
his receive queue so our send queue will block as well.

To inform the caller of this situation "send queue full" we return this
state information to the caller of the dbwrap send function.

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 lib/dbwrap/dbwrap.h | 23 +++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/lib/dbwrap/dbwrap.h b/lib/dbwrap/dbwrap.h
index 6b77236..936e662 100644
--- a/lib/dbwrap/dbwrap.h
+++ b/lib/dbwrap/dbwrap.h
@@ -43,6 +43,29 @@ enum dbwrap_lock_order {
 #define DBWRAP_FLAG_NONE                     0x0000000000000000ULL
 #define DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS 0x0000000000000001ULL
 
+enum dbwrap_req_state {
+	/**
+	 * We are creating the request
+	 */
+	DBWRAP_REQ_INIT,
+	/**
+	 * The request is queued and waiting to be dispatched
+	 */
+	DBWRAP_REQ_QUEUED,
+	/**
+	 * We are waiting to receive the reply
+	 */
+	DBWRAP_REQ_DISPATCHED,
+	/**
+	 * The request is finished
+	 */
+	DBWRAP_REQ_DONE,
+	/**
+	 * The request errored out
+	 */
+	DBWRAP_REQ_ERROR
+};
+
 /* The following definitions come from lib/dbwrap.c  */
 
 TDB_DATA dbwrap_record_get_key(const struct db_record *rec);
-- 
2.9.3


From 0a8d49f9368c0b1bca732fc758c5cb671fc7c7c1 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Tue, 10 Jan 2017 14:48:07 +0100
Subject: [PATCH 03/16] dbwrap: add parse_record_send/recv to struct db_context

The implementation comes next.

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 lib/dbwrap/dbwrap_private.h | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/lib/dbwrap/dbwrap_private.h b/lib/dbwrap/dbwrap_private.h
index 15ebbc9..9b50ccc 100644
--- a/lib/dbwrap/dbwrap_private.h
+++ b/lib/dbwrap/dbwrap_private.h
@@ -23,6 +23,9 @@
 #ifndef __DBWRAP_PRIVATE_H__
 #define __DBWRAP_PRIVATE_H__
 
+struct tevent_context;
+struct tevent_req;
+
 struct db_record {
 	struct db_context *db;
 	TDB_DATA key, value;
@@ -55,6 +58,15 @@ struct db_context {
 				 void (*parser)(TDB_DATA key, TDB_DATA data,
 						void *private_data),
 				 void *private_data);
+	struct tevent_req *(*parse_record_send)(
+		TALLOC_CTX *mem_ctx,
+		struct tevent_context *ev,
+		struct db_context *db,
+		TDB_DATA key,
+		void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
+		void *private_data,
+		enum dbwrap_req_state *req_state);
+	NTSTATUS (*parse_record_recv)(struct tevent_req *req);
 	int (*exists)(struct db_context *db,TDB_DATA key);
 	int (*wipe)(struct db_context *db);
 	int (*check)(struct db_context *db);
-- 
2.9.3


From 0d34ed98dc760e37884113a36c3613c77c0054c8 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Mon, 9 Jan 2017 08:17:02 +0100
Subject: [PATCH 04/16] ctdb_conn: add ctdbd_parse_send/recv

Implement the ctdb packet layer for async parse send/recv with tevent.

ctdbd_setup_fde() will is used to create an fde from the
connection fd and will be called from dbwrap_ctdb.

ctdbd_parse_send() and ctdbd_parse_recv() will be used by dbwrap_ctdb
for async packet sending and receiving.

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source3/include/ctdbd_conn.h |  15 +
 source3/lib/ctdbd_conn.c     | 844 ++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 855 insertions(+), 4 deletions(-)

diff --git a/source3/include/ctdbd_conn.h b/source3/include/ctdbd_conn.h
index bbebbce..06fbcc3 100644
--- a/source3/include/ctdbd_conn.h
+++ b/source3/include/ctdbd_conn.h
@@ -23,6 +23,7 @@
 #include "replace.h"
 #include "system/filesys.h"
 #include "system/network.h"
+#include "lib/dbwrap/dbwrap.h"
 #include <tdb.h>
 #include <tevent.h>
 
@@ -36,6 +37,7 @@ int ctdbd_init_connection(TALLOC_CTX *mem_ctx,
 int ctdbd_reinit_connection(TALLOC_CTX *mem_ctx,
 			    const char *sockname, int timeout,
 			    struct ctdbd_connection *conn);
+int ctdbd_setup_fde(struct ctdbd_connection *conn, struct tevent_context *ev);
 
 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn);
 
@@ -94,4 +96,17 @@ int register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
 			void *private_data);
 int ctdbd_probe(const char *sockname, int timeout);
 
+struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
+				    struct tevent_context *ev,
+				    struct ctdbd_connection *conn,
+				    uint32_t db_id,
+				    TDB_DATA key,
+				    bool local_copy,
+				    void (*parser)(TDB_DATA key,
+						   TDB_DATA data,
+						   void *private_data),
+				    void *private_data,
+				    enum dbwrap_req_state *req_state);
+int ctdbd_parse_recv(struct tevent_req *req);
+
 #endif /* _CTDBD_CONN_H */
diff --git a/source3/lib/ctdbd_conn.c b/source3/lib/ctdbd_conn.c
index d16796f..c629d3c 100644
--- a/source3/lib/ctdbd_conn.c
+++ b/source3/lib/ctdbd_conn.c
@@ -19,6 +19,7 @@
 */
 
 #include "replace.h"
+#include <tevent.h>
 #include "util_tdb.h"
 #include "serverid.h"
 #include "ctdbd_conn.h"
@@ -30,6 +31,10 @@
 #include "lib/util/talloc_stack.h"
 #include "lib/util/genrand.h"
 #include "lib/util/fault.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/tevent_unix.c"
+#include "lib/util/sys_rw.h"
+#include "lib/util/blocking.h"
 
 /* paths to these include files come from --with-ctdb= in configure */
 
@@ -44,6 +49,9 @@ struct ctdbd_srvid_cb {
 	void *private_data;
 };
 
+struct ctdb_pkt_send_state;
+struct ctdb_pkt_recv_state;
+
 struct ctdbd_connection {
 	uint32_t reqid;
 	uint32_t our_vnn;
@@ -51,8 +59,44 @@ struct ctdbd_connection {
 	struct ctdbd_srvid_cb *callbacks;
 	int fd;
 	int timeout;
+
+	/* For async connections, enabled via ctdbd_setup_ev() */
+	struct tevent_fd *fde;
+
+	/* State to track in-progress read */
+	struct ctdb_read_state {
+		/* Receive buffer for the initial packet length */
+		uint32_t msglen;
+
+		/* iovec state for current read */
+		struct iovec iov;
+		struct iovec *iovs;
+		int iovcnt;
+
+		/* allocated receive buffer based on packet length */
+		struct ctdb_req_header *hdr;
+	} read_state;
+
+	/* Lists of pending async reads and writes */
+	struct ctdb_pkt_recv_state *recv_list;
+	struct ctdb_pkt_send_state *send_list;
 };
 
+static void ctdbd_async_socket_handler(struct tevent_context *ev,
+				       struct tevent_fd *fde,
+				       uint16_t flags,
+				       void *private_data);
+
+static bool ctdbd_conn_has_async_sends(struct ctdbd_connection *conn)
+{
+	return (conn->send_list != NULL);
+}
+
+static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
+{
+	return (conn->fde != NULL);
+}
+
 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
 {
 	conn->reqid += 1;
@@ -391,14 +435,28 @@ static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
 	return 0;
 }
 
-static int ctdbd_connection_destructor(struct ctdbd_connection *c)
+/**
+ * This prepares conn for handling async requests
+ **/
+int ctdbd_setup_fde(struct ctdbd_connection *conn, struct tevent_context *ev)
 {
-	if (c->fd != -1) {
-		close(c->fd);
-		c->fd = -1;
+	set_blocking(conn->fd, false);
+
+	conn->fde = tevent_add_fd(ev,
+				  conn,
+				  conn->fd,
+				  TEVENT_FD_READ,
+				  ctdbd_async_socket_handler,
+				  conn);
+	if (conn->fde == NULL) {
+		return ENOMEM;
 	}
+
 	return 0;
 }
+
+static int ctdbd_connection_destructor(struct ctdbd_connection *c);
+
 /*
  * Get us a ctdbd connection
  */
@@ -547,6 +605,41 @@ void ctdbd_socket_readable(struct ctdbd_connection *conn)
 	}
 }
 
+static int ctdb_pkt_send_handler(struct ctdbd_connection *conn);
+static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn);
+
+/* Used for async connection and async ctcb requests */
+static void ctdbd_async_socket_handler(struct tevent_context *ev,
+				       struct tevent_fd *fde,
+				       uint16_t flags,
+				       void *private_data)
+{
+	struct ctdbd_connection *conn = talloc_get_type_abort(
+		private_data, struct ctdbd_connection);
+	int ret;
+
+	if ((flags & TEVENT_FD_READ) != 0) {
+		ret = ctdb_pkt_recv_handler(conn);
+		if (ret != 0) {
+			DBG_DEBUG("ctdb_read_iov_handler returned %s\n",
+				  strerror(ret));
+		}
+		return;
+	}
+
+	if ((flags & TEVENT_FD_WRITE) != 0) {
+		ret = ctdb_pkt_send_handler(conn);
+		if (ret != 0) {
+			DBG_DEBUG("ctdb_write_iov_handler returned %s\n",
+				  strerror(ret));
+			return;
+		}
+		return;
+	}
+
+	return;
+}
+
 int ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
 			     uint32_t dst_vnn, uint64_t dst_srvid,
 			     const struct iovec *iov, int iovlen)
@@ -600,6 +693,17 @@ static int ctdbd_control(struct ctdbd_connection *conn,
 	ssize_t nwritten;
 	int ret;
 
+	if (ctdbd_conn_has_async_reqs(conn)) {
+		/*
+		 * Can't use sync call while an async call is in flight. Adding
+		 * this check as a safety net. We'll be using different
+		 * connections for sync and async requests, so this shouldn't
+		 * happen, but who knows...
+		 */
+		DBG_ERR("Async ctdb req on sync connection\n");
+		return EINVAL;
+	}
+
 	ZERO_STRUCT(req);
 	req.hdr.length = offsetof(struct ctdb_req_control_old, data) + data.dsize;
 	req.hdr.ctdb_magic   = CTDB_MAGIC;
@@ -768,6 +872,17 @@ int ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key)
 	ssize_t nwritten;
 	int ret;
 
+	if (ctdbd_conn_has_async_reqs(conn)) {
+		/*
+		 * Can't use sync call while an async call is in flight. Adding
+		 * this check as a safety net. We'll be using different
+		 * connections for sync and async requests, so this shouldn't
+		 * happen, but who knows...
+		 */
+		DBG_ERR("Async ctdb req on sync connection\n");
+		return EINVAL;
+	}
+
 	ZERO_STRUCT(req);
 
 	req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
@@ -833,6 +948,17 @@ int ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
 	uint32_t flags;
 	int ret;
 
+	if (ctdbd_conn_has_async_reqs(conn)) {
+		/*
+		 * Can't use sync call while an async call is in flight. Adding
+		 * this check as a safety net. We'll be using different
+		 * connections for sync and async requests, so this shouldn't
+		 * happen, but who knows...
+		 */
+		DBG_ERR("Async ctdb req on sync connection\n");
+		return EINVAL;
+	}
+
 	flags = local_copy ? CTDB_WANT_READONLY : 0;
 
 	ZERO_STRUCT(req);
@@ -904,6 +1030,17 @@ int ctdbd_traverse(struct ctdbd_connection *conn, uint32_t db_id,
 	struct ctdb_traverse_start t;
 	int32_t cstatus;
 
+	if (ctdbd_conn_has_async_reqs(conn)) {
+		/*
+		 * Can't use sync call while an async call is in flight. Adding
+		 * this check as a safety net. We'll be using different
+		 * connections for sync and async requests, so this shouldn't
+		 * happen, but who knows...
+		 */
+		DBG_ERR("Async ctdb req on sync connection\n");
+		return EINVAL;
+	}
+
 	t.db_id = db_id;
 	t.srvid = conn->rand_srvid;
 	t.reqid = ctdbd_next_reqid(conn);
@@ -1143,3 +1280,702 @@ int ctdbd_probe(const char *sockname, int timeout)
 
 	return ret;
 }
+
+struct ctdb_pkt_send_state {
+	struct ctdb_pkt_send_state *prev, *next;
+	struct tevent_context *ev;
+	struct ctdbd_connection *conn;
+
+	/* ctdb request id */
+	uint32_t reqid;
+
+	/* the associated tevent request */
+	struct tevent_req *req;
+
+	/* iovec array with data to send */
+	struct iovec _iov;
+	struct iovec *iov;
+	int iovcnt;
+
+	/* Initial packet length */
+	size_t packet_len;
+};
+
+static void ctdb_pkt_send_cleanup(struct tevent_req *req,
+				  enum tevent_req_state req_state);
+
+/**
+ * Asynchronously send a ctdb packet given as iovec array
+ *
+ * Note: the passed iov array is not const here. Similar
+ * functions in samba take a const array and create a copy
+ * before calling iov_advance() on the array.
+ *
+ * This function will modify the iov array! But
+ * this is a static function and our only caller
+ * ctdb_parse_send/recv is preparared for this to
+ * happen!
+ **/
+static struct tevent_req *ctdb_pkt_send_send(TALLOC_CTX *mem_ctx,
+					     struct tevent_context *ev,
+					     struct ctdbd_connection *conn,
+					     uint32_t reqid,
+					     struct iovec *iov,
+					     int iovcnt,
+					     enum dbwrap_req_state *req_state)
+{
+	struct tevent_req *req = NULL;
+	struct ctdb_pkt_send_state *state = NULL;
+	ssize_t nwritten;
+	bool ok;
+
+	DBG_DEBUG("sending async ctdb reqid [%" PRIu32 "]\n", reqid);
+
+	req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_send_state);
+	if (req == NULL) {
+		return NULL;
+	}
+
+	*state = (struct ctdb_pkt_send_state) {
+		.ev = ev,
+		.conn = conn,
+		.req = req,
+		.reqid = reqid,
+		.iov = iov,
+		.iovcnt = iovcnt,
+		.packet_len = iov_buflen(iov, iovcnt),
+	};
+
+	tevent_req_set_cleanup_fn(req, ctdb_pkt_send_cleanup);
+
+	*req_state = DBWRAP_REQ_QUEUED;
+
+	if (ctdbd_conn_has_async_sends(conn)) {
+		/*
+		 * Can't attempt direct write with messages already queued and
+		 * possibly in progress
+		 */
+		DLIST_ADD_END(conn->send_list, state);
+		return req;
+	}
+
+	/*
+	 * Attempt a direct write. If this returns short, shedule the
+	 * remaining data as an async write, otherwise we're already done.
+	 */
+
+	nwritten = writev(conn->fd, state->iov, state->iovcnt);
+	if (nwritten == state->packet_len) {
+		DBG_DEBUG("Finished sending reqid [%" PRIu32 "]\n", reqid);
+
+		*req_state = DBWRAP_REQ_DISPATCHED;
+		tevent_req_done(req);
+		return tevent_req_post(req, ev);
+	}
+
+	if (nwritten == -1) {
+		if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
+			cluster_fatal("cluster write error\n");
+		}
+		nwritten = 0;
+	}
+
+	DBG_DEBUG("Posting async write of reqid [%" PRIu32"]"
+		  "after short write [%zd]\n", reqid, nwritten);
+
+	ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
+	if (!ok) {
+		*req_state = DBWRAP_REQ_ERROR;
+		tevent_req_error(req, EIO);
+		return tevent_req_post(req, ev);
+	}
+
+	/*
+	 * As this is the first async write req we post, we must enable
+	 * fd-writable events.
+	 */
+	TEVENT_FD_WRITEABLE(conn->fde);
+	DLIST_ADD_END(conn->send_list, state);
+	return req;
+}
+
+static int ctdb_pkt_send_state_destructor(struct ctdb_pkt_send_state *state)
+{
+	struct ctdbd_connection *conn = state->conn;
+
+	if (conn == NULL) {
+		return 0;
+	}
+
+	if (state->req == NULL) {
+		DBG_DEBUG("Removing cancelled reqid [%" PRIu32"]\n",
+			  state->reqid);
+		state->conn = NULL;
+		DLIST_REMOVE(conn->send_list, state);
+		return 0;
+	}
+
+	DBG_DEBUG("Reparenting cancelled reqid [%" PRIu32"]\n",
+		  state->reqid);
+
+	talloc_reparent(state->req, conn, state);
+	state->req = NULL;
+	return -1;
+}
+
+static void ctdb_pkt_send_cleanup(struct tevent_req *req,
+				  enum tevent_req_state req_state)
+{
+	struct ctdb_pkt_send_state *state = tevent_req_data(
+		req, struct ctdb_pkt_send_state);
+	struct ctdbd_connection *conn = state->conn;
+	size_t missing_len = 0;
+
+	if (conn == NULL) {
+		return;
+	}
+
+	missing_len = iov_buflen(state->iov, state->iovcnt);
+	if (state->packet_len == missing_len) {
+		/*
+		 * We haven't yet started sending this one, so we can just
+		 * remove it from the pending list
+		 */
+		missing_len = 0;
+	}
+	if (missing_len != 0) {
+		uint8_t *buf = NULL;
+
+		if (req_state != TEVENT_REQ_RECEIVED) {
+			/*
+			 * Wait til the req_state is TEVENT_REQ_RECEIVED, as
+			 * that will be the final state when the request state
+			 * is talloc_free'd from tallloc_req_received(). Which
+			 * ensures we only run the following code *ONCE*!
+			 */
+			return;
+		}
+
+		DBG_DEBUG("Cancelling in-flight reqid [%" PRIu32"]\n",
+			  state->reqid);
+		/*
+		 * A request in progress of being sent. Reparent the iov buffer
+		 * so we can continue sending the request. See also the comment
+		 * in ctdbd_parse_send() when copying the key buffer.
+		 */
+
+		buf = iov_concat(state, state->iov, state->iovcnt);
+		if (buf == NULL) {
+			cluster_fatal("iov_concat error\n");
+			return;
+		}
+
+		state->iovcnt = 1;
+		state->_iov.iov_base = buf;
+		state->_iov.iov_len = missing_len;
+		state->iov = &state->_iov;
+
+		talloc_set_destructor(state, ctdb_pkt_send_state_destructor);
+		return;
+	}
+
+	DBG_DEBUG("Removing pending reqid [%" PRIu32"]\n", state->reqid);
+
+	state->conn = NULL;
+	DLIST_REMOVE(conn->send_list, state);
+
+	if (!ctdbd_conn_has_async_sends(conn)) {
+		DBG_DEBUG("No more sends, disabling fd-writable events\n");
+		TEVENT_FD_NOT_WRITEABLE(conn->fde);
+	}
+}
+
+static int ctdb_pkt_send_handler(struct ctdbd_connection *conn)
+{
+	struct ctdb_pkt_send_state *state = NULL;
+	ssize_t nwritten;
+	ssize_t iovlen;
+	bool ok;
+
+	DBG_DEBUG("send handler\n");
+
+	if (!ctdbd_conn_has_async_sends(conn)) {
+		DBG_WARNING("Writable fd-event without pending send\n");
+		TEVENT_FD_NOT_WRITEABLE(conn->fde);
+		return 0;
+	}
+
+	state = conn->send_list;
+	iovlen = iov_buflen(state->iov, state->iovcnt);
+
+	nwritten = writev(conn->fd, state->iov, state->iovcnt);
+	if (nwritten == -1) {
+		if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
+			DBG_ERR("writev failed: %s\n", strerror(errno));
+			cluster_fatal("cluster write error\n");
+		}
+		DBG_DEBUG("recoverable writev error, retry\n");
+		return 0;
+	}
+
+	if (nwritten < iovlen) {
+		DBG_DEBUG("short write\n");
+
+		ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
+		if (!ok) {
+			DBG_ERR("iov_advance failed\n");
+			if (state->req == NULL) {
+				TALLOC_FREE(state);
+				return 0;
+			}
+			tevent_req_error(state->req, EIO);
+			return 0;
+		}
+		return 0;
+	}
+
+	if (state->req == NULL) {
+		DBG_DEBUG("Finished sending cancelled reqid [%" PRIu32 "]\n",
+			  state->reqid);
+		TALLOC_FREE(state);
+		return 0;
+	}
+
+	DBG_DEBUG("Finished send request id [%" PRIu32 "]\n", state->reqid);
+
+	tevent_req_done(state->req);
+	return 0;
+}
+
+static int ctdb_pkt_send_recv(struct tevent_req *req)
+{
+	int ret;
+
+	if (tevent_req_is_unix_error(req, &ret)) {
+		tevent_req_received(req);
+		return ret;
+	}
+
+	tevent_req_received(req);
+	return 0;
+}
+
+struct ctdb_pkt_recv_state {
+	struct ctdb_pkt_recv_state *prev, *next;
+	struct tevent_context *ev;
+	struct ctdbd_connection *conn;
+
+	/* ctdb request id */
+	uint32_t reqid;
+
+	/* the associated tevent_req */
+	struct tevent_req *req;
+
+	/* pointer to allocated ctdb packet buffer */
+	struct ctdb_req_header *hdr;
+};
+
+static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
+				  enum tevent_req_state req_state);
+
+static struct tevent_req *ctdb_pkt_recv_send(TALLOC_CTX *mem_ctx,
+					     struct tevent_context *ev,
+					     struct ctdbd_connection *conn,
+					     uint32_t reqid)
+{
+	struct tevent_req *req = NULL;
+	struct ctdb_pkt_recv_state *state = NULL;
+
+	req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_recv_state);
+	if (req == NULL) {
+		return NULL;
+	}
+
+	*state = (struct ctdb_pkt_recv_state) {
+		.ev = ev,
+		.conn = conn,
+		.reqid = reqid,
+		.req = req,
+	};
+
+	tevent_req_set_cleanup_fn(req, ctdb_pkt_recv_cleanup);
+
+	/*
+	 * fd-readable event is always set for the fde, no need to deal with
+	 * that here.
+	 */
+
+	DLIST_ADD_END(conn->recv_list, state);
+	DBG_DEBUG("Posted receive reqid [%" PRIu32 "]\n", state->reqid);
+
+	return req;
+}
+
+static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
+				  enum tevent_req_state req_state)
+{
+	struct ctdb_pkt_recv_state *state = tevent_req_data(
+		req, struct ctdb_pkt_recv_state);
+	struct ctdbd_connection *conn = state->conn;
+
+	if (conn == NULL) {
+		return;
+	}
+	state->conn = NULL;
+	DLIST_REMOVE(conn->recv_list, state);
+}
+
+static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn)
+{
+	struct ctdb_pkt_recv_state *state = NULL;
+	ssize_t nread;
+	ssize_t iovlen;
+	bool ok;
+
+	DBG_DEBUG("receive handler\n");
+
+	if (conn->read_state.iovs == NULL) {
+		conn->read_state.iov.iov_base = &conn->read_state.msglen;
+		conn->read_state.iov.iov_len = sizeof(conn->read_state.msglen);
+		conn->read_state.iovs = &conn->read_state.iov;
+		conn->read_state.iovcnt = 1;
+	}
+
+	iovlen = iov_buflen(conn->read_state.iovs, conn->read_state.iovcnt);
+
+	DBG_DEBUG("iovlen [%zd]\n", iovlen);
+
+	nread = readv(conn->fd, conn->read_state.iovs, conn->read_state.iovcnt);
+	if (nread == 0) {
+		cluster_fatal("cluster read error, peer closed connection\n");
+	}
+	if (nread == -1) {
+		if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
+			cluster_fatal("cluster read error\n");
+		}
+		DBG_DEBUG("recoverable error from readv, retry\n");
+		return 0;
+	}
+
+	if (nread < iovlen) {
+		DBG_DEBUG("iovlen [%zd] nread [%zd]\n", iovlen, nread);
+		ok = iov_advance(&conn->read_state.iovs,
+				 &conn->read_state.iovcnt,
+				 nread);
+		if (!ok) {
+			return EIO;
+		}
+		return 0;
+	}
+
+	conn->read_state.iovs = NULL;
+	conn->read_state.iovcnt = 0;
+
+	if (conn->read_state.hdr == NULL) {
+		/*
+		 * Going this way after reading the 4 initial byte message
+		 * length
+		 */
+		uint32_t msglen = conn->read_state.msglen;
+		uint8_t *readbuf = NULL;
+		size_t readlen;
+
+		DBG_DEBUG("msglen: %" PRIu32 "\n", msglen);
+
+		if (msglen < sizeof(struct ctdb_req_header)) {
+			DBG_ERR("short message %" PRIu32 "\n", msglen);
+			return EIO;
+		}
+
+		conn->read_state.hdr = talloc_size(conn, msglen);
+		if (conn->read_state.hdr == NULL) {
+			return ENOMEM;
+		}
+		conn->read_state.hdr->length = msglen;
+		talloc_set_name_const(conn->read_state.hdr,
+				      "struct ctdb_req_header");
+
+		readbuf = (uint8_t *)conn->read_state.hdr + sizeof(msglen);
+		readlen = msglen - sizeof(msglen);
+
+		conn->read_state.iov.iov_base = readbuf;
+		conn->read_state.iov.iov_len = readlen;
+		conn->read_state.iovs = &conn->read_state.iov;
+		conn->read_state.iovcnt = 1;
+
+		DBG_DEBUG("Scheduled packet read size %zd\n", readlen);
+		return 0;
+	}
+
+	/*
+	 * Searching a list here is expected to be cheap, as messages are
+	 * exepcted to be coming in more or less ordered and we should find the
+	 * waiting request near the beginning of the list.
+	 */
+	for (state = conn->recv_list; state != NULL; state = state->next) {
+		if (state->reqid == conn->read_state.hdr->reqid) {
+			break;
+		}
+	}
+
+	if (state == NULL) {
+		DBG_ERR("Discarding async ctdb reqid %u\n",
+			conn->read_state.hdr->reqid);
+		TALLOC_FREE(conn->read_state.hdr);
+		ZERO_STRUCT(conn->read_state);
+		return EINVAL;
+	}
+
+	DBG_DEBUG("Got reply for reqid [%" PRIu32 "]\n", state->reqid);
+
+	state->hdr = talloc_move(state, &conn->read_state.hdr);
+	ZERO_STRUCT(conn->read_state);
+	tevent_req_done(state->req);
+	return 0;
+}
+
+static int ctdb_pkt_recv_recv(struct tevent_req *req,
+			      TALLOC_CTX *mem_ctx,
+			      struct ctdb_req_header **_hdr)
+{
+	struct ctdb_pkt_recv_state *state = tevent_req_data(
+		req, struct ctdb_pkt_recv_state);
+	int error;
+
+	if (tevent_req_is_unix_error(req, &error)) {
+		DBG_ERR("ctdb_read_req failed %s\n", strerror(error));
+		tevent_req_received(req);
+		return error;
+	}
+
+	*_hdr = talloc_move(mem_ctx, &state->hdr);
+
+	tevent_req_received(req);
+	return 0;
+}
+
+static int ctdbd_connection_destructor(struct ctdbd_connection *c)
+{
+	struct ctdb_pkt_recv_state *recv_state = NULL;
+	struct ctdb_pkt_send_state *send_state = NULL;
+
+	TALLOC_FREE(c->fde);
+	if (c->fd != -1) {
+		close(c->fd);
+		c->fd = -1;
+	}
+
+	TALLOC_FREE(c->read_state.hdr);
+	ZERO_STRUCT(c->read_state);
+
+	for (send_state = c->send_list; send_state != NULL;) {
+		DLIST_REMOVE(c->send_list, send_state);
+		send_state->conn = NULL;
+		tevent_req_defer_callback(send_state->req, send_state->ev);
+		tevent_req_error(send_state->req, EIO);
+	}
+
+	for (recv_state = c->recv_list; recv_state != NULL;) {
+		DLIST_REMOVE(c->recv_list, recv_state);
+		recv_state->conn = NULL;
+		tevent_req_defer_callback(send_state->req, recv_state->ev);
+		tevent_req_error(recv_state->req, EIO);
+	}
+
+	return 0;
+}
+
+struct ctdbd_parse_state {
+	struct tevent_context *ev;
+	struct ctdbd_connection *conn;
+	uint32_t reqid;
+	TDB_DATA key;
+	uint8_t _keybuf[64];
+	struct ctdb_req_call_old ctdb_req;
+	struct iovec iov[2];
+	void (*parser)(TDB_DATA key,
+		       TDB_DATA data,
+		       void *private_data);
+	void *private_data;
+	enum dbwrap_req_state *req_state;
+};
+
+static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq);
+static void ctdbd_parse_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
+				    struct tevent_context *ev,
+				    struct ctdbd_connection *conn,
+				    uint32_t db_id,
+				    TDB_DATA key,
+				    bool local_copy,
+				    void (*parser)(TDB_DATA key,
+						   TDB_DATA data,
+						   void *private_data),
+				    void *private_data,
+				    enum dbwrap_req_state *req_state)
+{
+	struct tevent_req *req = NULL;
+	struct ctdbd_parse_state *state = NULL;
+	uint32_t flags;
+	uint32_t packet_length;
+	struct tevent_req *subreq = NULL;
+
+	req = tevent_req_create(mem_ctx, &state, struct ctdbd_parse_state);
+	if (req == NULL) {
+		*req_state = DBWRAP_REQ_ERROR;
+		return NULL;
+	}
+
+	*state = (struct ctdbd_parse_state) {
+		.ev = ev,
+		.conn = conn,
+		.reqid = ctdbd_next_reqid(conn),
+		.parser = parser,
+		.private_data = private_data,
+		.req_state = req_state,
+	};
+
+	flags = local_copy ? CTDB_WANT_READONLY : 0;
+	packet_length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
+
+	/*
+	 * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
+	 * all passed iov elements have a lifetime longer that the tevent_req
+	 * returned by ctdb_pkt_send_send(). This is required continue sending a
+	 * the low level request into the ctdb socket, if a higher level
+	 * ('this') request is canceled (or talloc free'd) by the application
+	 * layer, without sending invalid packets to ctdb.
+	 */
+	if (key.dsize > sizeof(state->_keybuf)) {
+		state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
+		if (tevent_req_nomem(state->key.dptr, req)) {
+			return tevent_req_post(req, ev);
+		}
+	} else {
+		memcpy(state->_keybuf, key.dptr, key.dsize);
+		state->key.dptr = state->_keybuf;
+	}
+	state->key.dsize = key.dsize;
+
+	state->ctdb_req.hdr.length       = packet_length;
+	state->ctdb_req.hdr.ctdb_magic   = CTDB_MAGIC;
+	state->ctdb_req.hdr.ctdb_version = CTDB_PROTOCOL;
+	state->ctdb_req.hdr.operation    = CTDB_REQ_CALL;
+	state->ctdb_req.hdr.reqid        = state->reqid;
+	state->ctdb_req.flags            = flags;
+	state->ctdb_req.callid           = CTDB_FETCH_FUNC;
+	state->ctdb_req.db_id            = db_id;
+	state->ctdb_req.keylen           = state->key.dsize;
+
+	state->iov[0].iov_base = &state->ctdb_req;
+	state->iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
+	state->iov[1].iov_base = state->key.dptr;
+	state->iov[1].iov_len = state->key.dsize;
+
+	/*
+	 * Note that ctdb_pkt_send_send()
+	 * will modify state->iov using
+	 * iov_advance() without making a copy.
+	 */
+	subreq = ctdb_pkt_send_send(state,
+				    ev,
+				    conn,
+				    state->reqid,
+				    state->iov,
+				    ARRAY_SIZE(state->iov),
+				    req_state);
+	if (tevent_req_nomem(subreq, req)) {
+		*req_state = DBWRAP_REQ_ERROR;
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, ctdbd_parse_pkt_send_done, req);
+
+	return req;
+}
+
+static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct ctdbd_parse_state *state = tevent_req_data(
+		req, struct ctdbd_parse_state);
+	int ret;
+
+	ret = ctdb_pkt_send_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		DBG_DEBUG("ctdb_pkt_send_recv failed %s\n", strerror(ret));
+		return;
+	}
+
+	subreq = ctdb_pkt_recv_send(state,
+				    state->ev,
+				    state->conn,
+				    state->reqid);
+	if (tevent_req_nomem(subreq, req)) {
+		return;
+	}
+
+	*state->req_state = DBWRAP_REQ_DISPATCHED;
+	tevent_req_set_callback(subreq, ctdbd_parse_done, req);
+	return;
+}
+
+static void ctdbd_parse_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct ctdbd_parse_state *state = tevent_req_data(
+		req, struct ctdbd_parse_state);
+	struct ctdb_req_header *hdr = NULL;
+	struct ctdb_reply_call_old *reply = NULL;
+	int ret;
+
+	DBG_DEBUG("async parse request finished\n");
+
+	ret = ctdb_pkt_recv_recv(subreq, state, &hdr);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		DBG_ERR("ctdb_pkt_recv_recv returned %s\n", strerror(ret));
+		return;
+	}
+
+	if (hdr->operation != CTDB_REPLY_CALL) {
+		DBG_ERR("received invalid reply\n");
+		ctdb_packet_dump(hdr);
+		tevent_req_error(req, EIO);
+		return;
+	}
+
+	reply = (struct ctdb_reply_call_old *)hdr;
+
+	if (reply->datalen == 0) {
+		/*
+		 * Treat an empty record as non-existing
+		 */
+		tevent_req_error(req, ENOENT);
+		return;
+	}
+
+	state->parser(state->key,
+		      make_tdb_data(&reply->data[0], reply->datalen),
+		      state->private_data);
+
+	tevent_req_done(req);
+	return;
+}
+
+int ctdbd_parse_recv(struct tevent_req *req)
+{
+	int error;
+
+	if (tevent_req_is_unix_error(req, &error)) {
+		DBG_DEBUG("async parse returned %s\n", strerror(error));
+		tevent_req_received(req);
+		return error;
+	}
+
+	tevent_req_received(req);
+	return 0;
+}
-- 
2.9.3


From da87d11e5f6708b6a1bf0c426f6d3587ab96578b Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Thu, 23 Feb 2017 18:28:32 +0100
Subject: [PATCH 05/16] dbwrap_ctdb: factor out a
 db_ctdb_try_parse_local_record() function

Pair-programmed-with: Stefan Metzmacher <metze at samba.org>

Signed-off-by: Ralph Boehme <slow at samba.org>
Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 source3/lib/dbwrap/dbwrap_ctdb.c | 58 +++++++++++++++++++++++++---------------
 1 file changed, 36 insertions(+), 22 deletions(-)

diff --git a/source3/lib/dbwrap/dbwrap_ctdb.c b/source3/lib/dbwrap/dbwrap_ctdb.c
index 88db204..e0223ee 100644
--- a/source3/lib/dbwrap/dbwrap_ctdb.c
+++ b/source3/lib/dbwrap/dbwrap_ctdb.c
@@ -1254,22 +1254,11 @@ static void db_ctdb_parse_record_parser_nonpersistent(
 	}
 }
 
-static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
-				     void (*parser)(TDB_DATA key,
-						    TDB_DATA data,
-						    void *private_data),
-				     void *private_data)
+static NTSTATUS db_ctdb_try_parse_local_record(struct db_ctdb_ctx *ctx,
+					       TDB_DATA key,
+					       struct db_ctdb_parse_record_state *state)
 {
-	struct db_ctdb_ctx *ctx = talloc_get_type_abort(
-		db->private_data, struct db_ctdb_ctx);
-	struct db_ctdb_parse_record_state state;
 	NTSTATUS status;
-	int ret;
-
-	state.parser = parser;
-	state.private_data = private_data;
-	state.my_vnn = ctdbd_vnn(ctx->conn);
-	state.empty_record = false;
 
 	if (ctx->transaction != NULL) {
 		struct db_ctdb_transaction_handle *h = ctx->transaction;
@@ -1280,28 +1269,28 @@ static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
 		 */
 
 		found = parse_newest_in_marshall_buffer(
-			h->m_write, key, db_ctdb_parse_record_parser, &state);
+			h->m_write, key, db_ctdb_parse_record_parser, state);
 
 		if (found) {
 			return NT_STATUS_OK;
 		}
 	}
 
-	if (db->persistent) {
+	if (ctx->db->persistent) {
 		/*
 		 * Persistent db, but not found in the transaction buffer
 		 */
 		return db_ctdb_ltdb_parse(
-			ctx, key, db_ctdb_parse_record_parser, &state);
+			ctx, key, db_ctdb_parse_record_parser, state);
 	}
 
-	state.done = false;
-	state.ask_for_readonly_copy = false;
+	state->done = false;
+	state->ask_for_readonly_copy = false;
 
 	status = db_ctdb_ltdb_parse(
-		ctx, key, db_ctdb_parse_record_parser_nonpersistent, &state);
-	if (NT_STATUS_IS_OK(status) && state.done) {
-		if (state.empty_record) {
+		ctx, key, db_ctdb_parse_record_parser_nonpersistent, state);
+	if (NT_STATUS_IS_OK(status) && state->done) {
+		if (state->empty_record) {
 			/*
 			 * We know authoritatively, that this is an empty
 			 * record. Since ctdb does not distinguish between empty
@@ -1318,6 +1307,31 @@ static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
 		return NT_STATUS_OK;
 	}
 
+	return NT_STATUS_MORE_PROCESSING_REQUIRED;
+}
+
+static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
+				     void (*parser)(TDB_DATA key,
+						    TDB_DATA data,
+						    void *private_data),
+				     void *private_data)
+{
+	struct db_ctdb_ctx *ctx = talloc_get_type_abort(
+		db->private_data, struct db_ctdb_ctx);
+	struct db_ctdb_parse_record_state state;
+	NTSTATUS status;
+	int ret;
+
+	state.parser = parser;
+	state.private_data = private_data;
+	state.my_vnn = ctdbd_vnn(ctx->conn);
+	state.empty_record = false;
+
+	status = db_ctdb_try_parse_local_record(ctx, key, &state);
+	if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
+		return status;
+	}
+
 	ret = ctdbd_parse(ctx->conn, ctx->db_id, key,
 			  state.ask_for_readonly_copy, parser, private_data);
 	if (ret != 0) {
-- 
2.9.3


From e7484e07cce12e928c2babd8f39b0b39c15de193 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Wed, 21 Dec 2016 08:38:25 +0100
Subject: [PATCH 06/16] dbwrap_ctdb: implement parse_record_send()/recv()

This mainly works like the sync version, but calls ctdbd_parse_send/recv
instead.

We use one global ctdb connection that is used exclusively for async
requests.

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source3/lib/ctdb_dummy.c         |   5 ++
 source3/lib/dbwrap/dbwrap_ctdb.c | 161 +++++++++++++++++++++++++++++++++++++++
 source3/lib/dbwrap/dbwrap_ctdb.h |   1 +
 source3/lib/util.c               |  12 +++
 4 files changed, 179 insertions(+)

diff --git a/source3/lib/ctdb_dummy.c b/source3/lib/ctdb_dummy.c
index 8b617ba..0b1acb7 100644
--- a/source3/lib/ctdb_dummy.c
+++ b/source3/lib/ctdb_dummy.c
@@ -94,3 +94,8 @@ struct ctdbd_connection *messaging_ctdbd_connection(void)
 {
 	return NULL;
 }
+
+int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+	return ENOSYS;
+}
diff --git a/source3/lib/dbwrap/dbwrap_ctdb.c b/source3/lib/dbwrap/dbwrap_ctdb.c
index e0223ee..87ac8e1 100644
--- a/source3/lib/dbwrap/dbwrap_ctdb.c
+++ b/source3/lib/dbwrap/dbwrap_ctdb.c
@@ -35,6 +35,7 @@
 #include "g_lock.h"
 #include "messages.h"
 #include "lib/cluster_support.h"
+#include "lib/util/tevent_ntstatus.h"
 
 struct db_ctdb_transaction_handle {
 	struct db_ctdb_ctx *ctx;
@@ -68,6 +69,59 @@ struct db_ctdb_rec {
 	struct timeval lock_time;
 };
 
+struct ctdb_async_ctx {
+	bool initialized;
+	struct ctdbd_connection *async_conn;
+};
+
+static struct ctdb_async_ctx ctdb_async_ctx;
+
+static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
+					struct tevent_context *ev,
+					bool reinit)
+{
+	int ret;
+
+	if (reinit) {
+		TALLOC_FREE(ctdb_async_ctx.async_conn);
+		ctdb_async_ctx.initialized = false;
+	}
+
+	if (ctdb_async_ctx.initialized) {
+		return 0;
+	}
+
+	become_root();
+	ret = ctdbd_init_connection(mem_ctx,
+				    lp_ctdbd_socket(),
+				    lp_ctdb_timeout(),
+				    &ctdb_async_ctx.async_conn);
+	unbecome_root();
+
+	if (ctdb_async_ctx.async_conn == NULL) {
+		DBG_ERR("ctdbd_init_connection failed\n");
+		return EIO;
+	}
+
+	ret = ctdbd_setup_fde(ctdb_async_ctx.async_conn, ev);
+	if (ret != 0) {
+		DBG_ERR("ctdbd_setup_ev failed\n");
+		return ret;
+	}
+
+	return 0;
+}
+
+static int ctdb_async_ctx_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+	return ctdb_async_ctx_init_internal(mem_ctx, ev, false);
+}
+
+int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+	return ctdb_async_ctx_init_internal(mem_ctx, ev, true);
+}
+
 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
 {
 	enum TDB_ERROR tret = tdb_error(tdb);
@@ -1350,6 +1404,102 @@ static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
 	return NT_STATUS_OK;
 }
 
+static void db_ctdb_parse_record_done(struct tevent_req *subreq);
+
+static struct tevent_req *db_ctdb_parse_record_send(
+	TALLOC_CTX *mem_ctx,
+	struct tevent_context *ev,
+	struct db_context *db,
+	TDB_DATA key,
+	void (*parser)(TDB_DATA key,
+		       TDB_DATA data,
+		       void *private_data),
+	void *private_data,
+	enum dbwrap_req_state *req_state)
+{
+	struct db_ctdb_ctx *ctx = talloc_get_type_abort(
+		db->private_data, struct db_ctdb_ctx);
+	struct tevent_req *req = NULL;
+	struct tevent_req *subreq = NULL;
+	struct db_ctdb_parse_record_state *state = NULL;
+	NTSTATUS status;
+
+	req = tevent_req_create(mem_ctx, &state,
+				struct db_ctdb_parse_record_state);
+	if (req == NULL) {
+		*req_state = DBWRAP_REQ_ERROR;
+		return NULL;
+
+	}
+
+	*state = (struct db_ctdb_parse_record_state) {
+		.parser = parser,
+		.private_data = private_data,
+		.my_vnn = ctdbd_vnn(ctx->conn),
+		.empty_record = false,
+	};
+
+	status = db_ctdb_try_parse_local_record(ctx, key, state);
+	if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
+		if (tevent_req_nterror(req, status)) {
+			*req_state = DBWRAP_REQ_ERROR;
+			return tevent_req_post(req, ev);
+		}
+		*req_state = DBWRAP_REQ_DONE;
+		tevent_req_done(req);
+		return tevent_req_post(req, ev);
+	}
+
+	subreq = ctdbd_parse_send(state,
+				  ev,
+				  ctdb_async_ctx.async_conn,
+				  ctx->db_id,
+				  key,
+				  state->ask_for_readonly_copy,
+				  parser,
+				  private_data,
+				  req_state);
+	if (tevent_req_nomem(subreq, req)) {
+		*req_state = DBWRAP_REQ_ERROR;
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, db_ctdb_parse_record_done, req);
+
+	return req;
+}
+
+static void db_ctdb_parse_record_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	int ret;
+
+	ret = ctdbd_parse_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (ret != 0) {
+		if (ret == ENOENT) {
+			/*
+			 * This maps to NT_STATUS_OBJECT_NAME_NOT_FOUND. Our
+			 * upper layers expect NT_STATUS_NOT_FOUND for "no
+			 * record around". We need to convert dbwrap to 0/errno
+			 * away from NTSTATUS ... :-)
+			 */
+			tevent_req_nterror(req, NT_STATUS_NOT_FOUND);
+			return;
+		}
+		tevent_req_nterror(req, map_nt_error_from_unix(ret));
+		return;
+	}
+
+	tevent_req_done(req);
+	return;
+}
+
+static NTSTATUS db_ctdb_parse_record_recv(struct tevent_req *req)
+{
+	return tevent_req_simple_recv_ntstatus(req);
+}
+
 struct traverse_state {
 	struct db_context *db;
 	int (*fn)(struct db_record *rec, void *private_data);
@@ -1675,6 +1825,15 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
 	tdb_flags &= TDB_SEQNUM|TDB_VOLATILE|
 		TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST;
 
+	if (!result->persistent) {
+		ret = ctdb_async_ctx_init(NULL, messaging_tevent_context(msg_ctx));
+		if (ret != 0) {
+			DBG_ERR("ctdb_async_ctx_init failed: %s\n", strerror(ret));
+			TALLOC_FREE(result);
+			return NULL;
+		}
+	}
+
 	if (!result->persistent &&
 	    (dbwrap_flags & DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS))
 	{
@@ -1745,6 +1904,8 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
 	result->fetch_locked = db_ctdb_fetch_locked;
 	result->try_fetch_locked = db_ctdb_try_fetch_locked;
 	result->parse_record = db_ctdb_parse_record;
+	result->parse_record_send = db_ctdb_parse_record_send;
+	result->parse_record_recv = db_ctdb_parse_record_recv;
 	result->traverse = db_ctdb_traverse;
 	result->traverse_read = db_ctdb_traverse_read;
 	result->get_seqnum = db_ctdb_get_seqnum;
diff --git a/source3/lib/dbwrap/dbwrap_ctdb.h b/source3/lib/dbwrap/dbwrap_ctdb.h
index 3f04702..42c831f 100644
--- a/source3/lib/dbwrap/dbwrap_ctdb.h
+++ b/source3/lib/dbwrap/dbwrap_ctdb.h
@@ -36,5 +36,6 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
 				int open_flags, mode_t mode,
 				enum dbwrap_lock_order lock_order,
 				uint64_t dbwrap_flags);
+int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev);
 
 #endif /* __DBWRAP_CTDB_H__ */
diff --git a/source3/lib/util.c b/source3/lib/util.c
index d525be6..fb50884 100644
--- a/source3/lib/util.c
+++ b/source3/lib/util.c
@@ -35,6 +35,7 @@
 #include "lib/util/sys_rw.h"
 #include "lib/util/sys_rw_data.h"
 #include "lib/util/util_process.h"
+#include "lib/dbwrap/dbwrap_ctdb.h"
 
 #ifdef HAVE_SYS_PRCTL_H
 #include <sys/prctl.h>
@@ -437,6 +438,7 @@ NTSTATUS reinit_after_fork(struct messaging_context *msg_ctx,
 			   const char *comment)
 {
 	NTSTATUS status = NT_STATUS_OK;
+	int ret;
 
 	if (reinit_after_fork_pipe[1] != -1) {
 		close(reinit_after_fork_pipe[1]);
@@ -478,6 +480,16 @@ NTSTATUS reinit_after_fork(struct messaging_context *msg_ctx,
 			DEBUG(0,("messaging_reinit() failed: %s\n",
 				 nt_errstr(status)));
 		}
+
+		if (lp_clustering()) {
+			ret = ctdb_async_ctx_reinit(
+				NULL, messaging_tevent_context(msg_ctx));
+			if (ret != 0) {
+				DBG_ERR("db_ctdb_async_ctx_reinit failed: %s\n",
+					strerror(errno));
+				return map_nt_error_from_unix(ret);
+			}
+		}
 	}
 
 	if (comment) {
-- 
2.9.3


From 7841360f18977b01c69724e5620e17c2d21db7fc Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Tue, 27 Dec 2016 09:13:37 +0100
Subject: [PATCH 07/16] dbwrap: add dbwrap_parse_record_send/recv

The req_state parameter tells the caller whether the async request is
blocked in a full send queue:

req_state >= DBWRAP_REQ_DISPATCHED := request is dispatched
req_state < DBWRAP_REQ_DISPATCHED := send queue is full

This is useful in a clustered Samba environment where the async dbwrap
request is sent over a socket to the local ctdbd.

If the send queue is full and the caller was issuing multiple async
dbwrap requests in a loop, the caller knows it's probably time to stop
sending requests for now and try again later.

This will be used in subsequent commits in
smbd_smb2_query_directory_send() when implementing async write time
updates. Directories may contain umpteen files so we send many requests
to ctdb without going through tevent and reading the responses which has
the potential to deadlock.

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 lib/dbwrap/dbwrap.c      | 112 +++++++++++++++++++++++++++++++++++++++++++++++
 lib/dbwrap/dbwrap.h      |  33 ++++++++++++++
 lib/dbwrap/wscript_build |   2 +-
 3 files changed, 146 insertions(+), 1 deletion(-)

diff --git a/lib/dbwrap/dbwrap.c b/lib/dbwrap/dbwrap.c
index 68e5608..025d463 100644
--- a/lib/dbwrap/dbwrap.c
+++ b/lib/dbwrap/dbwrap.c
@@ -26,6 +26,7 @@
 #include "dbwrap/dbwrap.h"
 #include "dbwrap/dbwrap_private.h"
 #include "lib/util/util_tdb.h"
+#include "lib/util/tevent_ntstatus.h"
 
 /*
  * Fall back using fetch if no genuine exists operation is provided
@@ -368,6 +369,117 @@ NTSTATUS dbwrap_parse_record(struct db_context *db, TDB_DATA key,
 	return db->parse_record(db, key, parser, private_data);
 }
 
+struct dbwrap_parse_record_state {
+	struct db_context *db;
+	TDB_DATA key;
+	uint8_t _keybuf[64];
+};
+
+static void dbwrap_parse_record_done(struct tevent_req *subreq);
+
+struct tevent_req *dbwrap_parse_record_send(
+	TALLOC_CTX *mem_ctx,
+	struct tevent_context *ev,
+	struct db_context *db,
+	TDB_DATA key,
+	void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
+	void *private_data,
+	enum dbwrap_req_state *req_state)
+{
+	struct tevent_req *req = NULL;
+	struct tevent_req *subreq = NULL;
+	struct dbwrap_parse_record_state *state = NULL;
+	NTSTATUS status;
+
+	req = tevent_req_create(mem_ctx, &state, struct dbwrap_parse_record_state);
+	if (req == NULL) {
+		*req_state = DBWRAP_REQ_ERROR;
+		return NULL;
+	}
+
+	*state = (struct dbwrap_parse_record_state) {
+		.db = db,
+	};
+
+	if (parser == NULL) {
+		parser = dbwrap_null_parser;
+	}
+
+	*req_state = DBWRAP_REQ_INIT;
+
+	if (db->parse_record_send == NULL) {
+		/*
+		 * Backend doesn't implement async version, call sync one
+		 */
+		status = db->parse_record(db, key, parser, private_data);
+		if (tevent_req_nterror(req, status)) {
+			*req_state = DBWRAP_REQ_DONE;
+			return tevent_req_post(req, ev);
+		}
+
+		*req_state = DBWRAP_REQ_DONE;
+		tevent_req_done(req);
+		return tevent_req_post(req, ev);
+	}
+
+	/*
+	 * Copy the key into our state ensuring the key data buffer is always
+	 * available to the all dbwrap backend over the entire lifetime of the
+	 * async request. Otherwise the caller might have free'd the key buffer.
+	 */
+	if (key.dsize > sizeof(state->_keybuf)) {
+		state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
+		if (tevent_req_nomem(state->key.dptr, req)) {
+			return tevent_req_post(req, ev);
+		}
+	} else {
+		memcpy(state->_keybuf, key.dptr, key.dsize);
+		state->key.dptr = state->_keybuf;
+	}
+	state->key.dsize = key.dsize;
+
+	subreq = db->parse_record_send(state,
+				       ev,
+				       db,
+				       state->key,
+				       parser,
+				       private_data,
+				       req_state);
+	if (tevent_req_nomem(subreq, req)) {
+		*req_state = DBWRAP_REQ_ERROR;
+		return tevent_req_post(req, ev);
+	}
+
+	tevent_req_set_callback(subreq,
+				dbwrap_parse_record_done,
+				req);
+	return req;
+}
+
+static void dbwrap_parse_record_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct dbwrap_parse_record_state *state = tevent_req_data(
+		req, struct dbwrap_parse_record_state);
+	NTSTATUS status;
+
+	status = state->db->parse_record_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (!NT_STATUS_IS_OK(status)) {
+		tevent_req_nterror(req, status);
+		return;
+	}
+
+	tevent_req_done(req);
+	return;
+}
+
+NTSTATUS dbwrap_parse_record_recv(struct tevent_req *req)
+{
+	return tevent_req_simple_recv_ntstatus(req);
+}
+
 int dbwrap_wipe(struct db_context *db)
 {
 	if (db->wipe == NULL) {
diff --git a/lib/dbwrap/dbwrap.h b/lib/dbwrap/dbwrap.h
index 936e662..fac65ee 100644
--- a/lib/dbwrap/dbwrap.h
+++ b/lib/dbwrap/dbwrap.h
@@ -22,6 +22,7 @@
 
 #include "replace.h"
 #include <talloc.h>
+#include <tevent.h>
 #include "libcli/util/ntstatus.h"
 #include "tdb.h"
 #include "lib/param/loadparm.h"
@@ -98,6 +99,38 @@ NTSTATUS dbwrap_parse_record(struct db_context *db, TDB_DATA key,
 			     void (*parser)(TDB_DATA key, TDB_DATA data,
 					    void *private_data),
 			     void *private_data);
+/**
+ * Async implementation of dbwrap_parse_record
+ *
+ * @param[in]  mem_ctx      talloc memory context to use.
+ *
+ * @param[in]  ev           tevent context to use
+ *
+ * @param[in]  db           Database to query
+ *
+ * @param[in]  key          Record key, the function makes a copy of this
+ *
+ * @param[in]  parser       Parser callback function
+ *
+ * @param[in]  private_data Private data for the callback function
+ *
+ * @param[out] req_state    Pointer to a enum dbwrap_req_state variable
+ *
+ * @note req_state is updated in the send function. To determine the final
+ * result of the request the caller should therefor not rely on req_state. The
+ * primary use case is to give the caller an indication whether the request is
+ * already sent to ctdb (DBWRAP_REQ_DISPATCHED) or if it's still stuck in the
+ * sendqueue (DBWRAP_REQ_QUEUED).
+ **/
+struct tevent_req *dbwrap_parse_record_send(
+	TALLOC_CTX *mem_ctx,
+	struct tevent_context *ev,
+	struct db_context *db,
+	TDB_DATA key,
+	void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
+	void *private_data,
+	enum dbwrap_req_state *req_state);
+NTSTATUS dbwrap_parse_record_recv(struct tevent_req *req);
 int dbwrap_wipe(struct db_context *db);
 int dbwrap_check(struct db_context *db);
 int dbwrap_get_seqnum(struct db_context *db);
diff --git a/lib/dbwrap/wscript_build b/lib/dbwrap/wscript_build
index b719a60..83e5895 100644
--- a/lib/dbwrap/wscript_build
+++ b/lib/dbwrap/wscript_build
@@ -1,6 +1,6 @@
 SRC = '''dbwrap.c dbwrap_util.c dbwrap_rbt.c dbwrap_cache.c dbwrap_tdb.c
          dbwrap_local_open.c'''
-DEPS= '''samba-util util_tdb samba-errors tdb tdb-wrap samba-hostconfig'''
+DEPS= '''samba-util util_tdb samba-errors tdb tdb-wrap samba-hostconfig tevent tevent-util'''
 
 bld.SAMBA_LIBRARY('dbwrap',
                   source=SRC,
-- 
2.9.3


From 01631bfa15bf8b10907567c3684cfa343aaf764c Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Mon, 26 Dec 2016 10:15:11 +0100
Subject: [PATCH 08/16] dbwrap_watch: add parse_record_send/recv wrappers

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source3/lib/dbwrap/dbwrap_watch.c | 84 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 84 insertions(+)

diff --git a/source3/lib/dbwrap/dbwrap_watch.c b/source3/lib/dbwrap/dbwrap_watch.c
index efff478..6057bf4 100644
--- a/source3/lib/dbwrap/dbwrap_watch.c
+++ b/source3/lib/dbwrap/dbwrap_watch.c
@@ -552,6 +552,88 @@ static NTSTATUS dbwrap_watched_parse_record(
 	return NT_STATUS_OK;
 }
 
+static void dbwrap_watched_parse_record_done(struct tevent_req *subreq);
+
+static struct tevent_req *dbwrap_watched_parse_record_send(
+	TALLOC_CTX *mem_ctx,
+	struct tevent_context *ev,
+	struct db_context *db,
+	TDB_DATA key,
+	void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
+	void *private_data,
+	enum dbwrap_req_state *req_state)
+{
+	struct db_watched_ctx *ctx = talloc_get_type_abort(
+		db->private_data, struct db_watched_ctx);
+	struct tevent_req *req = NULL;
+	struct tevent_req *subreq = NULL;
+	struct dbwrap_watched_parse_record_state *state = NULL;
+
+	req = tevent_req_create(mem_ctx, &state,
+				struct dbwrap_watched_parse_record_state);
+	if (req == NULL) {
+		*req_state = DBWRAP_REQ_ERROR;
+		return NULL;
+	}
+
+	*state = (struct dbwrap_watched_parse_record_state) {
+		.parser = parser,
+		.private_data = private_data,
+		.deleted = false,
+	};
+
+	subreq = dbwrap_parse_record_send(state,
+					  ev,
+					  ctx->backend,
+					  key,
+					  dbwrap_watched_parse_record_parser,
+					  state,
+					  req_state);
+	if (tevent_req_nomem(subreq, req)) {
+		*req_state = DBWRAP_REQ_ERROR;
+		return tevent_req_post(req, ev);
+	}
+
+	tevent_req_set_callback(subreq, dbwrap_watched_parse_record_done, req);
+	return req;
+}
+
+static void dbwrap_watched_parse_record_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct dbwrap_watched_parse_record_state *state = tevent_req_data(
+		req, struct dbwrap_watched_parse_record_state);
+	NTSTATUS status;
+
+	status = dbwrap_parse_record_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (tevent_req_nterror(req, status)) {
+		return;
+	}
+
+	if (state->deleted) {
+		tevent_req_nterror(req, NT_STATUS_NOT_FOUND);
+		return;
+	}
+
+	tevent_req_done(req);
+	return;
+}
+
+static NTSTATUS dbwrap_watched_parse_record_recv(struct tevent_req *req)
+{
+	NTSTATUS status;
+
+	if (tevent_req_is_nterror(req, &status)) {
+		tevent_req_received(req);
+		return status;
+	}
+
+	tevent_req_received(req);
+	return NT_STATUS_OK;
+}
+
 static int dbwrap_watched_exists(struct db_context *db, TDB_DATA key)
 {
 	struct db_watched_ctx *ctx = talloc_get_type_abort(
@@ -601,6 +683,8 @@ struct db_context *db_open_watched(TALLOC_CTX *mem_ctx,
 	db->transaction_commit = dbwrap_watched_transaction_commit;
 	db->transaction_cancel = dbwrap_watched_transaction_cancel;
 	db->parse_record = dbwrap_watched_parse_record;
+	db->parse_record_send = dbwrap_watched_parse_record_send;
+	db->parse_record_recv = dbwrap_watched_parse_record_recv;
 	db->exists = dbwrap_watched_exists;
 	db->id = dbwrap_watched_id;
 	db->name = dbwrap_name(ctx->backend);
-- 
2.9.3


From 81c9aa7094a9fa6b1b7ee48c3ebd0dbbacf483d5 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Wed, 4 Jan 2017 08:00:29 +0100
Subject: [PATCH 09/16] s3/locking: add fetch_share_mode_send/recv

The boolean out parameter "queued" tells the caller whether the
async request is blocked in a full send queue:

false := request is dispatched
true  := send queue is full, request waiting to be dispatched

This is useful in a clustered Samba environment where the async dbwrap
request is sent over a socket to the local ctdbd.

If the send queue is full and the caller was issuing multiple async
dbwrap requests in a loop, the caller knows it's probably time to stop
sending requests for now and try again later.

This will be used in subsequent commits in
smbd_smb2_query_directory_send() when implementing async write time
updates. Directories may contain umpteen files so we send many requests
to ctdb without going through tevent and reading the responses which
has the potential to deadlock.

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source3/locking/proto.h           |   7 +++
 source3/locking/share_mode_lock.c | 125 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 132 insertions(+)

diff --git a/source3/locking/proto.h b/source3/locking/proto.h
index 17cb1cd..967af02 100644
--- a/source3/locking/proto.h
+++ b/source3/locking/proto.h
@@ -153,6 +153,13 @@ struct share_mode_lock *get_share_mode_lock(
 	const struct timespec *old_write_time);
 struct share_mode_lock *fetch_share_mode_unlocked(TALLOC_CTX *mem_ctx,
 						  struct file_id id);
+struct tevent_req *fetch_share_mode_send(TALLOC_CTX *mem_ctx,
+					 struct tevent_context *ev,
+					 struct file_id id,
+					 bool *queued);
+NTSTATUS fetch_share_mode_recv(struct tevent_req *req,
+			       TALLOC_CTX *mem_ctx,
+			       struct share_mode_lock **_lck);
 bool rename_share_filename(struct messaging_context *msg_ctx,
 			struct share_mode_lock *lck,
 			struct file_id id,
diff --git a/source3/locking/share_mode_lock.c b/source3/locking/share_mode_lock.c
index 16d8ed4..0333b0d 100644
--- a/source3/locking/share_mode_lock.c
+++ b/source3/locking/share_mode_lock.c
@@ -50,6 +50,7 @@
 #include "source3/lib/dbwrap/dbwrap_watch.h"
 #include "locking/leases_db.h"
 #include "../lib/util/memcache.h"
+#include "lib/util/tevent_ntstatus.h"
 
 #undef DBGC_CLASS
 #define DBGC_CLASS DBGC_LOCKING
@@ -667,6 +668,130 @@ struct share_mode_lock *fetch_share_mode_unlocked(TALLOC_CTX *mem_ctx,
 	return state.lck;
 }
 
+static void fetch_share_mode_done(struct tevent_req *subreq);
+
+struct fetch_share_mode_state {
+	struct file_id id;
+	TDB_DATA key;
+	struct share_mode_lock *lck;
+	enum dbwrap_req_state req_state;
+};
+
+/**
+ * @brief Get a share_mode_lock without locking or refcounting
+ *
+ * This can be used in a clustered Samba environment where the async dbwrap
+ * request is sent over a socket to the local ctdbd. If the send queue is full
+ * and the caller was issuing multiple async dbwrap requests in a loop, the
+ * caller knows it's probably time to stop sending requests for now and try
+ * again later.
+ *
+ * @param[in]  mem_ctx The talloc memory context to use.
+ *
+ * @param[in]  ev      The event context to work on.
+ *
+ * @param[in]  id      The file id for the locking.tdb key
+ *
+ * @param[out] queued  This boolean out parameter tells the caller whether the
+ *                     async request is blocked in a full send queue:
+ *
+ *                     false := request is dispatched
+ *
+ *                     true  := send queue is full, request waiting to be
+ *                              dispatched
+ *
+ * @return             The new async request, NULL on error.
+ **/
+struct tevent_req *fetch_share_mode_send(TALLOC_CTX *mem_ctx,
+					 struct tevent_context *ev,
+					 struct file_id id,
+					 bool *queued)
+{
+	struct tevent_req *req = NULL;
+	struct fetch_share_mode_state *state = NULL;
+	struct tevent_req *subreq = NULL;
+
+	*queued = false;
+
+	req = tevent_req_create(mem_ctx, &state,
+				struct fetch_share_mode_state);
+	if (req == NULL) {
+		return NULL;
+	}
+
+	state->id = id;
+	state->key = locking_key(&state->id);
+	state->lck = talloc_zero(state, struct share_mode_lock);
+	if (tevent_req_nomem(state->lck, req)) {
+		return tevent_req_post(req, ev);
+	}
+
+	subreq = dbwrap_parse_record_send(state,
+					  ev,
+					  lock_db,
+					  state->key,
+					  fetch_share_mode_unlocked_parser,
+					  state->lck,
+					  &state->req_state);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, fetch_share_mode_done, req);
+
+	if (state->req_state < DBWRAP_REQ_DISPATCHED) {
+		*queued = true;
+	}
+	return req;
+}
+
+static void fetch_share_mode_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	NTSTATUS status;
+
+	status = dbwrap_parse_record_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (tevent_req_nterror(req, status)) {
+		return;
+	}
+
+	tevent_req_done(req);
+	return;
+}
+
+NTSTATUS fetch_share_mode_recv(struct tevent_req *req,
+			       TALLOC_CTX *mem_ctx,
+			       struct share_mode_lock **_lck)
+{
+	struct fetch_share_mode_state *state = tevent_req_data(
+		req, struct fetch_share_mode_state);
+	struct share_mode_lock *lck = NULL;
+
+	NTSTATUS status;
+
+	if (tevent_req_is_nterror(req, &status)) {
+		tevent_req_received(req);
+		return status;
+	}
+
+	if (state->lck->data == NULL) {
+		tevent_req_received(req);
+		return NT_STATUS_NOT_FOUND;
+	}
+
+	lck = talloc_move(mem_ctx, &state->lck);
+
+	if (DEBUGLEVEL >= 10) {
+		DBG_DEBUG("share_mode_data:\n");
+		NDR_PRINT_DEBUG(share_mode_data, lck->data);
+	}
+
+	*_lck = lck;
+	tevent_req_received(req);
+	return NT_STATUS_OK;
+}
+
 struct share_mode_forall_state {
 	int (*fn)(struct file_id fid, const struct share_mode_data *data,
 		  void *private_data);
-- 
2.9.3


From b6aa2ed16cec3ecd9dfaf2f175d80f53bef7bb1e Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Thu, 22 Dec 2016 14:53:17 +0100
Subject: [PATCH 10/16] s3/smbd: add file_id return arg to
 smbd_dirptr_lanman2_entry

Not used for now, needed for async write_time updates in
smbd_smb2_query_directory_send().

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source3/smbd/globals.h              |  3 ++-
 source3/smbd/smb2_query_directory.c |  1 +
 source3/smbd/trans2.c               | 10 ++++++++--
 3 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/source3/smbd/globals.h b/source3/smbd/globals.h
index 67d3a89..22e364c 100644
--- a/source3/smbd/globals.h
+++ b/source3/smbd/globals.h
@@ -206,7 +206,8 @@ NTSTATUS smbd_dirptr_lanman2_entry(TALLOC_CTX *ctx,
 			       int space_remaining,
 			       bool *got_exact_match,
 			       int *_last_entry_off,
-			       struct ea_list *name_list);
+			       struct ea_list *name_list,
+			       struct file_id *file_id);
 
 NTSTATUS smbd_calculate_access_mask(connection_struct *conn,
 				    const struct smb_filename *smb_fname,
diff --git a/source3/smbd/smb2_query_directory.c b/source3/smbd/smb2_query_directory.c
index 2af029b..dc71642 100644
--- a/source3/smbd/smb2_query_directory.c
+++ b/source3/smbd/smb2_query_directory.c
@@ -478,6 +478,7 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 					       space_remaining,
 					       &got_exact_match,
 					       &last_entry_off,
+					       NULL,
 					       NULL);
 
 		off = (int)PTR_DIFF(pdata, base_data);
diff --git a/source3/smbd/trans2.c b/source3/smbd/trans2.c
index 923fed4..e8346ba 100644
--- a/source3/smbd/trans2.c
+++ b/source3/smbd/trans2.c
@@ -2456,7 +2456,8 @@ NTSTATUS smbd_dirptr_lanman2_entry(TALLOC_CTX *ctx,
 			       int space_remaining,
 			       bool *got_exact_match,
 			       int *_last_entry_off,
-			       struct ea_list *name_list)
+			       struct ea_list *name_list,
+			       struct file_id *file_id)
 {
 	const char *p;
 	const char *mask = NULL;
@@ -2537,6 +2538,11 @@ NTSTATUS smbd_dirptr_lanman2_entry(TALLOC_CTX *ctx,
 		DEBUG(1,("Conversion error: illegal character: %s\n",
 			 smb_fname_str_dbg(smb_fname)));
 	}
+
+	if (file_id != NULL) {
+		*file_id = vfs_file_id_from_sbuf(conn, &smb_fname->st);
+	}
+
 	TALLOC_FREE(fname);
 	TALLOC_FREE(smb_fname);
 	if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
@@ -2584,7 +2590,7 @@ static NTSTATUS get_lanman2_dir_entry(TALLOC_CTX *ctx,
 					 ppdata, base_data, end_data,
 					 space_remaining,
 					 got_exact_match,
-					 last_entry_off, name_list);
+					 last_entry_off, name_list, NULL);
 }
 
 /****************************************************************************
-- 
2.9.3


From b806d73ce98fee508e1ecc6056197f5cc83ede46 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Fri, 23 Dec 2016 19:51:49 +0100
Subject: [PATCH 11/16] s3/smbd: ask_sharemode is not needed for info_level
 SMB_FIND_FILE_NAMES_INFO

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source3/smbd/smb2_query_directory.c | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/source3/smbd/smb2_query_directory.c b/source3/smbd/smb2_query_directory.c
index dc71642..0b84f84 100644
--- a/source3/smbd/smb2_query_directory.c
+++ b/source3/smbd/smb2_query_directory.c
@@ -225,7 +225,7 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 	uint32_t num = 0;
 	uint32_t dirtype = FILE_ATTRIBUTE_HIDDEN | FILE_ATTRIBUTE_SYSTEM | FILE_ATTRIBUTE_DIRECTORY;
 	bool dont_descend = false;
-	bool ask_sharemode = true;
+	bool ask_sharemode = false;
 	bool wcard_has_wild = false;
 	struct tm tm;
 	char *p;
@@ -450,9 +450,17 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 		dont_descend = true;
 	}
 
-	ask_sharemode = lp_parm_bool(SNUM(conn),
-				     "smbd", "search ask sharemode",
-				     true);
+	/*
+	 * SMB_FIND_FILE_NAMES_INFO doesn't need stat information
+	 *
+	 * This may change when we try to improve the delete on close
+	 * handling in future.
+	 */
+	if (info_level != SMB_FIND_FILE_NAMES_INFO) {
+		ask_sharemode = lp_parm_bool(SNUM(conn),
+					     "smbd", "search ask sharemode",
+					     true);
+	}
 
 	while (true) {
 		bool got_exact_match = false;
-- 
2.9.3


From 72091dbe4ce7ae2c36c05a27e95beb388e1e3bc8 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Wed, 11 Jan 2017 15:00:24 +0100
Subject: [PATCH 12/16] s3/smbd: enable processing SMB2 requests async
 internally

The idea is to allow the implementation of an SMB2 request to tell the
main SMB2 processing engine that it wants to handle a requests
asynchronously internally.

This has two use cases:

- it allows (internal) async processing of compound requests that would
  otherwise be rejected by the SMB2 processing engine

- it preserves sync semantics at the SMB layer, some clients might not
  expect arbitrary SMB2 requests going async

Not used for now, will be used in laters commit for async SMB2 FIND
requests.

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source3/smbd/globals.h     | 10 ++++++++++
 source3/smbd/smb2_server.c | 18 ++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/source3/smbd/globals.h b/source3/smbd/globals.h
index 22e364c..ae5ecf4 100644
--- a/source3/smbd/globals.h
+++ b/source3/smbd/globals.h
@@ -276,6 +276,9 @@ NTSTATUS smbd_smb2_request_verify_creditcharge(struct smbd_smb2_request *req,
 NTSTATUS smbd_smb2_request_verify_sizes(struct smbd_smb2_request *req,
 					size_t expected_body_size);
 
+void smb2_request_set_async_internal(struct smbd_smb2_request *req,
+				     bool async_internal);
+
 enum protocol_types smbd_smb2_protocol_dialect_match(const uint8_t *indyn,
 		                                     const int dialect_count,
 						     uint16_t *dialect);
@@ -708,6 +711,13 @@ struct smbd_smb2_request {
 	bool compound_related;
 
 	/*
+	 * Give the implementation of an SMB2 req a way to tell the SMB2 request
+	 * processing engine that the internal request is going async, while
+	 * preserving synchronous SMB2 behaviour.
+	 */
+	bool async_internal;
+
+	/*
 	 * the encryption key for the whole
 	 * compound chain
 	 */
diff --git a/source3/smbd/smb2_server.c b/source3/smbd/smb2_server.c
index acaa012..d95631f 100644
--- a/source3/smbd/smb2_server.c
+++ b/source3/smbd/smb2_server.c
@@ -272,6 +272,12 @@ static int smbd_smb2_request_destructor(struct smbd_smb2_request *req)
 	return 0;
 }
 
+void smb2_request_set_async_internal(struct smbd_smb2_request *req,
+				     bool async_internal)
+{
+	req->async_internal = async_internal;
+}
+
 static struct smbd_smb2_request *smbd_smb2_request_allocate(TALLOC_CTX *mem_ctx)
 {
 	TALLOC_CTX *mem_pool;
@@ -1365,6 +1371,17 @@ NTSTATUS smbd_smb2_request_pending_queue(struct smbd_smb2_request *req,
 		return NT_STATUS_OK;
 	}
 
+	if (req->async_internal) {
+		/*
+		 * An SMB2 request implementation wants to handle the request
+		 * asynchronously "internally" while keeping synchronous
+		 * behaviour for the SMB2 request. This means we don't send an
+		 * interim response and we can allow processing of compound SMB2
+		 * requests (cf the subsequent check) for all cases.
+		 */
+		return NT_STATUS_OK;
+	}
+
 	if (req->in.vector_count > req->current_idx + SMBD_SMB2_NUM_IOV_PER_REQ) {
 		/*
 		 * We're trying to go async in a compound request
@@ -2292,6 +2309,7 @@ NTSTATUS smbd_smb2_request_dispatch(struct smbd_smb2_request *req)
 		encryption_required = x->global->encryption_flags & SMBXSRV_ENCRYPTION_REQUIRED;
 	}
 
+	req->async_internal = false;
 	req->do_signing = false;
 	req->do_encryption = false;
 	req->was_encrypted = false;
-- 
2.9.3


From 6ab95ccf97bd030c8f208518ead93180ea995504 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Sun, 18 Dec 2016 08:53:43 +0100
Subject: [PATCH 13/16] s3/smbd: make write time fetching async

Finally use the new async dbwrap_parse_record_send/recv() functions
respectively the fetch_share_mode_send/recv wrappers for fetching the
write time from locking.tdb.

Previously for a directory with n files we would sit idle in the
directory enumeration loop fo n * m seconds waiting for responses from
ctdb, where m is the response time in seconds for a dbwrap request via
ctbd.

This is known to kill performance and we even have a parameter
"smbd:search ask sharemode" that can be used to disable fetching the
write time from locking.tdb.

Using fetch_write_time_send() works this way: in the directory
enumeration loop that calls smbd_dirptr_lanman2_entry() to marshall the
directory entries we

1. call fetch_write_time_send() after calling smbd_dirptr_lanman2_entry
   passing a pointer to the current position in the marshall buffer.

2. If fetch_write_time_send() has set the out parameter "stop", we exit
   the enumeration loop. This is necessary because we only send dbwrap
   requests but don't consume the results. This has the potential to
   deadlock so we must stop sending requests as soon as our ctdb send
   queue is full.

3. In the fetch_write_time_done() callback, if the recv function got a
   locking.tdb record, we push the write time into the marshall buffer
   at the offet saved in the request state.

This new feature is still off by default as it doesn't
give any improvement in the non-clustered usecase.
"smbd:async search ask sharemode" can be used to activate it,
which makes only sense with "clustering = yes" (execept for testing).

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source3/smbd/smb2_query_directory.c | 206 +++++++++++++++++++++++++++++++++++-
 1 file changed, 201 insertions(+), 5 deletions(-)

diff --git a/source3/smbd/smb2_query_directory.c b/source3/smbd/smb2_query_directory.c
index 0b84f84..fc9ce52 100644
--- a/source3/smbd/smb2_query_directory.c
+++ b/source3/smbd/smb2_query_directory.c
@@ -193,11 +193,24 @@ static void smbd_smb2_request_find_done(struct tevent_req *subreq)
 	}
 }
 
+static struct tevent_req *fetch_write_time_send(TALLOC_CTX *mem_ctx,
+						struct tevent_context *ev,
+						connection_struct *conn,
+						struct file_id id,
+						int info_level,
+						char *entry_marshall_buf,
+						bool *stop);
+static NTSTATUS fetch_write_time_recv(struct tevent_req *req);
+
+
 struct smbd_smb2_query_directory_state {
 	struct smbd_smb2_request *smb2req;
+	uint64_t async_count;
 	DATA_BLOB out_output_buffer;
 };
 
+static void smb2_query_directory_fetch_write_time_done(struct tevent_req *subreq);
+
 static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 					      struct tevent_context *ev,
 					      struct smbd_smb2_request *smb2req,
@@ -226,6 +239,7 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 	uint32_t dirtype = FILE_ATTRIBUTE_HIDDEN | FILE_ATTRIBUTE_SYSTEM | FILE_ATTRIBUTE_DIRECTORY;
 	bool dont_descend = false;
 	bool ask_sharemode = false;
+	bool async_ask_sharemode = false;
 	bool wcard_has_wild = false;
 	struct tm tm;
 	char *p;
@@ -462,9 +476,24 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 					     true);
 	}
 
+	if (ask_sharemode && lp_clustering()) {
+		ask_sharemode = false;
+		async_ask_sharemode = true;
+
+		/*
+		 * Should we only set async_internal
+		 * if we're not the last request in
+		 * a compound chain?
+		 */
+		smb2_request_set_async_internal(smb2req, true);
+	}
+
 	while (true) {
 		bool got_exact_match = false;
 		int space_remaining = in_output_buffer_length - off;
+		int cur_off = off;
+		struct file_id file_id;
+		bool stop = false;
 
 		SMB_ASSERT(space_remaining >= 0);
 
@@ -487,7 +516,7 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 					       &got_exact_match,
 					       &last_entry_off,
 					       NULL,
-					       NULL);
+					       &file_id);
 
 		off = (int)PTR_DIFF(pdata, base_data);
 
@@ -499,9 +528,7 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 				 */
 				continue;
 			} else if (num > 0) {
-				SIVAL(state->out_output_buffer.data, last_entry_off, 0);
-				tevent_req_done(req);
-				return tevent_req_post(req, ev);
+				goto last_entry_done;
 			} else if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
 				tevent_req_nterror(req, NT_STATUS_INFO_LENGTH_MISMATCH);
 				return tevent_req_post(req, ev);
@@ -511,14 +538,45 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 			}
 		}
 
+		if (async_ask_sharemode) {
+			struct tevent_req *subreq = NULL;
+
+			subreq = fetch_write_time_send(req,
+						       ev,
+						       conn,
+						       file_id,
+						       info_level,
+						       base_data + cur_off,
+						       &stop);
+			if (tevent_req_nomem(subreq, req)) {
+				return tevent_req_post(req, ev);
+			}
+			tevent_req_set_callback(
+				subreq,
+				smb2_query_directory_fetch_write_time_done,
+				req);
+
+			state->async_count++;
+		}
+
 		num++;
 		state->out_output_buffer.length = off;
 
-		if (num < max_count) {
+		if (num >= max_count) {
+			stop = true;
+		}
+
+		if (!stop) {
 			continue;
 		}
 
+last_entry_done:
 		SIVAL(state->out_output_buffer.data, last_entry_off, 0);
+		if (state->async_count > 0) {
+			DBG_DEBUG("Stopping after %zu async mtime updates\n",
+				  state->async_count);
+			return req;
+		}
 		tevent_req_done(req);
 		return tevent_req_post(req, ev);
 	}
@@ -527,6 +585,30 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 	return tevent_req_post(req, ev);
 }
 
+static void smb2_query_directory_fetch_write_time_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct smbd_smb2_query_directory_state *state = tevent_req_data(
+		req, struct smbd_smb2_query_directory_state);
+	NTSTATUS status;
+
+	state->async_count--;
+
+	status = fetch_write_time_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (tevent_req_nterror(req, status)) {
+		return;
+	}
+
+	if (state->async_count > 0) {
+		return;
+	}
+
+	tevent_req_done(req);
+	return;
+}
+
 static NTSTATUS smbd_smb2_query_directory_recv(struct tevent_req *req,
 				    TALLOC_CTX *mem_ctx,
 				    DATA_BLOB *out_output_buffer)
@@ -546,3 +628,117 @@ static NTSTATUS smbd_smb2_query_directory_recv(struct tevent_req *req,
 	tevent_req_received(req);
 	return NT_STATUS_OK;
 }
+
+struct fetch_write_time_state {
+	connection_struct *conn;
+	struct file_id id;
+	int info_level;
+	char *entry_marshall_buf;
+};
+
+static void fetch_write_time_done(struct tevent_req *subreq);
+
+static struct tevent_req *fetch_write_time_send(TALLOC_CTX *mem_ctx,
+						struct tevent_context *ev,
+						connection_struct *conn,
+						struct file_id id,
+						int info_level,
+						char *entry_marshall_buf,
+						bool *stop)
+{
+	struct tevent_req *req = NULL;
+	struct fetch_write_time_state *state = NULL;
+	struct tevent_req *subreq = NULL;
+	bool req_queued;
+
+	*stop = false;
+
+	req = tevent_req_create(mem_ctx, &state, struct fetch_write_time_state);
+	if (req == NULL) {
+		return NULL;
+	}
+
+	*state = (struct fetch_write_time_state) {
+		.conn = conn,
+		.id = id,
+		.info_level = info_level,
+		.entry_marshall_buf = entry_marshall_buf,
+	};
+
+	subreq = fetch_share_mode_send(state, ev, id, &req_queued);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, fetch_write_time_done, req);
+
+	if (req_queued) {
+		*stop = true;
+	}
+	return req;
+}
+
+static void fetch_write_time_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct fetch_write_time_state *state = tevent_req_data(
+		req, struct fetch_write_time_state);
+	struct timespec write_time;
+	struct share_mode_lock *lck = NULL;
+	NTSTATUS status;
+	size_t off;
+
+	status = fetch_share_mode_recv(subreq, state, &lck);
+	TALLOC_FREE(subreq);
+	if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
+		tevent_req_done(req);
+		return;
+	}
+	if (!NT_STATUS_IS_OK(status)) {
+		tevent_req_nterror(req, status);
+		return;
+	}
+
+	write_time = get_share_mode_write_time(lck);
+	TALLOC_FREE(lck);
+
+	if (null_timespec(write_time)) {
+		tevent_req_done(req);
+		return;
+	}
+
+	switch (state->info_level) {
+	case SMB_FIND_FILE_DIRECTORY_INFO:
+	case SMB_FIND_FILE_FULL_DIRECTORY_INFO:
+	case SMB_FIND_FILE_BOTH_DIRECTORY_INFO:
+	case SMB_FIND_ID_FULL_DIRECTORY_INFO:
+	case SMB_FIND_ID_BOTH_DIRECTORY_INFO:
+		off = 24;
+		break;
+
+	default:
+		DBG_ERR("Unsupported info_level [%d]\n", state->info_level);
+		tevent_req_nterror(req, NT_STATUS_INVALID_LEVEL);
+		return;
+	}
+
+	put_long_date_timespec(state->conn->ts_res,
+			       state->entry_marshall_buf + off,
+			       write_time);
+
+	tevent_req_done(req);
+	return;
+}
+
+static NTSTATUS fetch_write_time_recv(struct tevent_req *req)
+{
+	NTSTATUS status;
+
+	if (tevent_req_is_nterror(req, &status)) {
+		tevent_req_received(req);
+		return status;
+	}
+
+	tevent_req_received(req);
+	return NT_STATUS_OK;
+}
-- 
2.9.3


From 2f43453a27c5f00e8e74c1bd4b3eb20911b0b099 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Wed, 11 Jan 2017 15:36:38 +0100
Subject: [PATCH 14/16] s3/smbd: add "smbd:find async delay usec" to SMB2 FIND

This is just a hack for selftest that will be used in subsequent commits
for torturing compound find requests.

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source3/smbd/smb2_query_directory.c | 66 +++++++++++++++++++++++++++++++++++++
 1 file changed, 66 insertions(+)

diff --git a/source3/smbd/smb2_query_directory.c b/source3/smbd/smb2_query_directory.c
index fc9ce52..9150673 100644
--- a/source3/smbd/smb2_query_directory.c
+++ b/source3/smbd/smb2_query_directory.c
@@ -204,12 +204,15 @@ static NTSTATUS fetch_write_time_recv(struct tevent_req *req);
 
 
 struct smbd_smb2_query_directory_state {
+	struct tevent_context *ev;
 	struct smbd_smb2_request *smb2req;
 	uint64_t async_count;
+	uint32_t find_async_delay_usec;
 	DATA_BLOB out_output_buffer;
 };
 
 static void smb2_query_directory_fetch_write_time_done(struct tevent_req *subreq);
+static void smb2_query_directory_waited(struct tevent_req *subreq);
 
 static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 					      struct tevent_context *ev,
@@ -249,6 +252,7 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 	if (req == NULL) {
 		return NULL;
 	}
+	state->ev = ev;
 	state->smb2req = smb2req;
 	state->out_output_buffer = data_blob_null;
 
@@ -488,6 +492,13 @@ static struct tevent_req *smbd_smb2_query_directory_send(TALLOC_CTX *mem_ctx,
 		smb2_request_set_async_internal(smb2req, true);
 	}
 
+	/*
+	 * This gets set in autobuild for some tests
+	 */
+	state->find_async_delay_usec = lp_parm_ulong(SNUM(conn), "smbd",
+						     "find async delay usec",
+						     0);
+
 	while (true) {
 		bool got_exact_match = false;
 		int space_remaining = in_output_buffer_length - off;
@@ -577,6 +588,30 @@ last_entry_done:
 				  state->async_count);
 			return req;
 		}
+
+		if (state->find_async_delay_usec > 0) {
+			struct timeval tv;
+			struct tevent_req *subreq = NULL;
+
+			/*
+			 * Should we only set async_internal
+			 * if we're not the last request in
+			 * a compound chain?
+			 */
+			smb2_request_set_async_internal(smb2req, true);
+
+			tv = timeval_current_ofs(0, state->find_async_delay_usec);
+
+			subreq = tevent_wakeup_send(state, ev, tv);
+			if (tevent_req_nomem(subreq, req)) {
+				return tevent_req_post(req, ev);
+			}
+			tevent_req_set_callback(subreq,
+						smb2_query_directory_waited,
+						req);
+			return req;
+		}
+
 		tevent_req_done(req);
 		return tevent_req_post(req, ev);
 	}
@@ -605,10 +640,41 @@ static void smb2_query_directory_fetch_write_time_done(struct tevent_req *subreq
 		return;
 	}
 
+	if (state->find_async_delay_usec > 0) {
+		struct timeval tv;
+
+		tv = timeval_current_ofs(0, state->find_async_delay_usec);
+
+		subreq = tevent_wakeup_send(state, state->ev, tv);
+		if (tevent_req_nomem(subreq, req)) {
+			tevent_req_post(req, state->ev);
+			return;
+		}
+		tevent_req_set_callback(subreq,
+					smb2_query_directory_waited,
+					req);
+		return;
+	}
+
 	tevent_req_done(req);
 	return;
 }
 
+static void smb2_query_directory_waited(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	bool ok;
+
+	ok = tevent_wakeup_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (!ok) {
+		tevent_req_oom(req);
+		return;
+	}
+	tevent_req_done(req);
+}
+
 static NTSTATUS smbd_smb2_query_directory_recv(struct tevent_req *req,
 				    TALLOC_CTX *mem_ctx,
 				    DATA_BLOB *out_output_buffer)
-- 
2.9.3


From 3928d5f96b2ca8ba4ca0e35e73f560488d71f5ee Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Wed, 11 Jan 2017 17:09:54 +0100
Subject: [PATCH 15/16] s4/torture: add a test for compound SMB2 FIND requests

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 source4/torture/smb2/compound.c | 123 ++++++++++++++++++++++++++++++++++++++++
 source4/torture/smb2/smb2.c     |   1 +
 2 files changed, 124 insertions(+)

diff --git a/source4/torture/smb2/compound.c b/source4/torture/smb2/compound.c
index a502103..1480576 100644
--- a/source4/torture/smb2/compound.c
+++ b/source4/torture/smb2/compound.c
@@ -1103,6 +1103,117 @@ done:
     return ret;
 }
 
+/* Test compound related finds */
+static bool test_compound_find_related(struct torture_context *tctx,
+				       struct smb2_tree *tree)
+{
+	TALLOC_CTX *mem_ctx = talloc_new(tctx);
+	const char *dname = "compound_find_dir";
+	struct smb2_create create;
+	struct smb2_find f;
+	struct smb2_handle h;
+	struct smb2_request *req[2];
+	NTSTATUS status;
+	bool ret = true;
+
+	smb2_deltree(tree, dname);
+
+	ZERO_STRUCT(create);
+	create.in.desired_access = SEC_RIGHTS_DIR_ALL;
+	create.in.create_options = NTCREATEX_OPTIONS_DIRECTORY;
+	create.in.file_attributes = FILE_ATTRIBUTE_DIRECTORY;
+	create.in.share_access = NTCREATEX_SHARE_ACCESS_READ |
+				 NTCREATEX_SHARE_ACCESS_WRITE |
+				 NTCREATEX_SHARE_ACCESS_DELETE;
+	create.in.create_disposition = NTCREATEX_DISP_CREATE;
+	create.in.fname = dname;
+
+	status = smb2_create(tree, mem_ctx, &create);
+	h = create.out.file.handle;
+
+	torture_assert_ntstatus_ok_goto(tctx, status, ret, done, "smb2_create failed\n");
+
+	smb2_transport_compound_start(tree->session->transport, 2);
+
+	ZERO_STRUCT(f);
+	f.in.file.handle	= h;
+	f.in.pattern		= "*";
+	f.in.max_response_size	= 0x100;
+	f.in.level              = SMB2_FIND_BOTH_DIRECTORY_INFO;
+
+	req[0] = smb2_find_send(tree, &f);
+
+	smb2_transport_compound_set_related(tree->session->transport, true);
+
+	req[1] = smb2_find_send(tree, &f);
+
+	status = smb2_find_recv(req[0], mem_ctx, &f);
+	torture_assert_ntstatus_ok_goto(tctx, status, ret, done, "smb2_find_recv failed\n");
+
+	status = smb2_find_recv(req[1], mem_ctx, &f);
+	torture_assert_ntstatus_equal_goto(tctx, status, STATUS_NO_MORE_FILES, ret, done, "smb2_find_recv failed\n");
+
+done:
+	smb2_util_close(tree, h);
+	smb2_deltree(tree, dname);
+	TALLOC_FREE(mem_ctx);
+	return ret;
+}
+
+/* Test compound unrelated finds */
+static bool test_compound_find_unrelated(struct torture_context *tctx,
+					 struct smb2_tree *tree)
+{
+	TALLOC_CTX *mem_ctx = talloc_new(tctx);
+	const char *dname = "compound_find_dir";
+	struct smb2_create create;
+	struct smb2_find f;
+	struct smb2_handle h;
+	struct smb2_request *req[2];
+	NTSTATUS status;
+	bool ret = true;
+
+	smb2_deltree(tree, dname);
+
+	ZERO_STRUCT(create);
+	create.in.desired_access = SEC_RIGHTS_DIR_ALL;
+	create.in.create_options = NTCREATEX_OPTIONS_DIRECTORY;
+	create.in.file_attributes = FILE_ATTRIBUTE_DIRECTORY;
+	create.in.share_access = NTCREATEX_SHARE_ACCESS_READ |
+				 NTCREATEX_SHARE_ACCESS_WRITE |
+				 NTCREATEX_SHARE_ACCESS_DELETE;
+	create.in.create_disposition = NTCREATEX_DISP_CREATE;
+	create.in.fname = dname;
+
+	status = smb2_create(tree, mem_ctx, &create);
+	h = create.out.file.handle;
+
+	torture_assert_ntstatus_ok_goto(tctx, status, ret, done, "smb2_create failed\n");
+
+	smb2_transport_compound_start(tree->session->transport, 2);
+
+	ZERO_STRUCT(f);
+	f.in.file.handle	= h;
+	f.in.pattern		= "*";
+	f.in.max_response_size	= 0x100;
+	f.in.level              = SMB2_FIND_BOTH_DIRECTORY_INFO;
+
+	req[0] = smb2_find_send(tree, &f);
+	req[1] = smb2_find_send(tree, &f);
+
+	status = smb2_find_recv(req[0], mem_ctx, &f);
+	torture_assert_ntstatus_ok_goto(tctx, status, ret, done, "smb2_find_recv failed\n");
+
+	status = smb2_find_recv(req[1], mem_ctx, &f);
+	torture_assert_ntstatus_equal_goto(tctx, status, STATUS_NO_MORE_FILES, ret, done, "smb2_find_recv failed\n");
+
+done:
+	smb2_util_close(tree, h);
+	smb2_deltree(tree, dname);
+	TALLOC_FREE(mem_ctx);
+	return ret;
+}
+
 struct torture_suite *torture_smb2_compound_init(void)
 {
 	struct torture_suite *suite = torture_suite_create(talloc_autofree_context(), "compound");
@@ -1124,3 +1235,15 @@ struct torture_suite *torture_smb2_compound_init(void)
 
 	return suite;
 }
+
+struct torture_suite *torture_smb2_compound_find_init(void)
+{
+	struct torture_suite *suite = torture_suite_create(talloc_autofree_context(), "compound_find");
+
+	torture_suite_add_1smb2_test(suite, "compound_find_related", test_compound_find_related);
+	torture_suite_add_1smb2_test(suite, "compound_find_unrelated", test_compound_find_unrelated);
+
+	suite->description = talloc_strdup(suite, "SMB2-COMPOUND-FIND tests");
+
+	return suite;
+}
diff --git a/source4/torture/smb2/smb2.c b/source4/torture/smb2/smb2.c
index 9f8cbe7..d8e3a06 100644
--- a/source4/torture/smb2/smb2.c
+++ b/source4/torture/smb2/smb2.c
@@ -162,6 +162,7 @@ NTSTATUS torture_smb2_init(void)
 	torture_suite_add_suite(suite, torture_smb2_dir_init());
 	torture_suite_add_suite(suite, torture_smb2_lease_init());
 	torture_suite_add_suite(suite, torture_smb2_compound_init());
+	torture_suite_add_suite(suite, torture_smb2_compound_find_init());
 	torture_suite_add_suite(suite, torture_smb2_oplocks_init());
 	torture_suite_add_suite(suite, torture_smb2_kernel_oplocks_init());
 	torture_suite_add_suite(suite, torture_smb2_streams_init());
-- 
2.9.3


From 3839905bb54e504a2e15aad8ca981b54a8233366 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Thu, 23 Feb 2017 22:20:39 +0100
Subject: [PATCH 16/16] selftest: also run smb2.compound_find against share
 with async delay set

Add a share with "smbd:find async delay usec" set to 10000 and run the
test smb2.compound_find added in the previous commit against this new
share as well.

Signed-off-by: Ralph Boehme <slow at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
 selftest/target/Samba3.pm | 4 ++++
 source3/selftest/tests.py | 4 ++++
 2 files changed, 8 insertions(+)

diff --git a/selftest/target/Samba3.pm b/selftest/target/Samba3.pm
index 0aa88ee..c241bd1 100755
--- a/selftest/target/Samba3.pm
+++ b/selftest/target/Samba3.pm
@@ -1979,6 +1979,10 @@ sub provision($$$$$$$$)
 	copy = tmp
 	kernel oplocks = yes
 	vfs objects = streams_xattr xattr_tdb
+
+[compound_find]
+	copy = tmp
+	smbd:find async delay usec = 10000
 	";
 	close(CONF);
 
diff --git a/source3/selftest/tests.py b/source3/selftest/tests.py
index 3959439..9bb7903 100755
--- a/source3/selftest/tests.py
+++ b/source3/selftest/tests.py
@@ -452,6 +452,10 @@ for t in tests:
             plansmbtorture4testsuite(t, "nt4_dc", '//$SERVER/kernel_oplocks -U$USERNAME%$PASSWORD')
     elif t == "vfs.acl_xattr":
         plansmbtorture4testsuite(t, "nt4_dc", '//$SERVER_IP/tmp -U$USERNAME%$PASSWORD')
+    elif t == "smb2.compound_find":
+        plansmbtorture4testsuite(t, "nt4_dc", '//$SERVER/compound_find -U$USERNAME%$PASSWORD')
+        plansmbtorture4testsuite(t, "nt4_dc", '//$SERVER_IP/tmp -U$USERNAME%$PASSWORD')
+        plansmbtorture4testsuite(t, "ad_dc", '//$SERVER/tmp -U$USERNAME%$PASSWORD')
     else:
         plansmbtorture4testsuite(t, "nt4_dc", '//$SERVER_IP/tmp -U$USERNAME%$PASSWORD')
         plansmbtorture4testsuite(t, "ad_dc", '//$SERVER/tmp -U$USERNAME%$PASSWORD')
-- 
2.9.3



More information about the samba-technical mailing list