svn commit: samba r15049 - in branches/SAMBA_4_0/source: lib/messaging torture/local

tridge at samba.org tridge at samba.org
Wed Apr 12 06:08:25 GMT 2006


Author: tridge
Date: 2006-04-12 06:08:24 +0000 (Wed, 12 Apr 2006)
New Revision: 15049

WebSVN: http://websvn.samba.org/cgi-bin/viewcvs.cgi?view=rev&root=samba&rev=15049

Log:

for really efficient oplock handling with thousands of open files we
will need a separate messaging endpoint per open file. To make this
efficient extend the messaging layer to have a new registration
function for temporary message types that maps via an idtree.

I have updated the LOCAL-MESSAGING test to use the new function.

Modified:
   branches/SAMBA_4_0/source/lib/messaging/irpc.h
   branches/SAMBA_4_0/source/lib/messaging/messaging.c
   branches/SAMBA_4_0/source/lib/messaging/messaging.h
   branches/SAMBA_4_0/source/torture/local/messaging.c


Changeset:
Modified: branches/SAMBA_4_0/source/lib/messaging/irpc.h
===================================================================
--- branches/SAMBA_4_0/source/lib/messaging/irpc.h	2006-04-12 04:42:40 UTC (rev 15048)
+++ branches/SAMBA_4_0/source/lib/messaging/irpc.h	2006-04-12 06:08:24 UTC (rev 15049)
@@ -76,14 +76,18 @@
 	} async;
 };
 
+typedef void (*msg_callback_t)(struct messaging_context *msg, void *private, 
+			       uint32_t msg_type, uint32_t server_id, DATA_BLOB *data);
 
 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, 
 					 struct event_context *ev);
 NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, 
 			uint32_t msg_type, DATA_BLOB *data);
-void messaging_register(struct messaging_context *msg, void *private,
-			uint32_t msg_type, 
-			void (*fn)(struct messaging_context *, void *, uint32_t, uint32_t, DATA_BLOB *));
+NTSTATUS messaging_register(struct messaging_context *msg, void *private,
+			    uint32_t msg_type, 
+			    msg_callback_t fn);
+NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private,
+				msg_callback_t fn, uint32_t *msg_type);
 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, 
 					 struct event_context *ev);
 struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, 

Modified: branches/SAMBA_4_0/source/lib/messaging/messaging.c
===================================================================
--- branches/SAMBA_4_0/source/lib/messaging/messaging.c	2006-04-12 04:42:40 UTC (rev 15048)
+++ branches/SAMBA_4_0/source/lib/messaging/messaging.c	2006-04-12 06:08:24 UTC (rev 15049)
@@ -41,7 +41,9 @@
 	struct socket_context *sock;
 	const char *base_path;
 	const char *path;
-	struct dispatch_fn *dispatch;
+	struct dispatch_fn **dispatch;
+	uint32_t num_types;
+	struct idr_context *dispatch_tree;
 	struct messaging_rec *pending;
 	struct irpc_list *irpc;
 	struct idr_context *idr;
@@ -54,14 +56,13 @@
 	} event;
 };
 
-/* we have a linked list of dispatch handlers that this messaging
-   server can deal with */
+/* we have a linked list of dispatch handlers for each msg_type that
+   this messaging server can deal with */
 struct dispatch_fn {
 	struct dispatch_fn *next, *prev;
 	uint32_t msg_type;
 	void *private;
-	void (*fn)(struct messaging_context *msg, void *private, 
-		   uint32_t msg_type, uint32_t server_id, DATA_BLOB *data);
+	msg_callback_t fn;
 };
 
 /* an individual message */
@@ -127,14 +128,22 @@
 static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec)
 {
 	struct dispatch_fn *d, *next;
-	for (d=msg->dispatch;d;d=next) {
+
+	/* temporary IDs use an idtree, the rest use a array of pointers */
+	if (rec->header->msg_type >= MSG_TMP_BASE) {
+		d = idr_find(msg->dispatch_tree, rec->header->msg_type);
+	} else if (rec->header->msg_type < msg->num_types) {
+		d = msg->dispatch[rec->header->msg_type];
+	} else {
+		d = NULL;
+	}
+
+	for (; d; d = next) {
+		DATA_BLOB data;
 		next = d->next;
-		if (d->msg_type == rec->header->msg_type) {
-			DATA_BLOB data;
-			data.data = rec->packet.data + sizeof(*rec->header);
-			data.length = rec->header->length;
-			d->fn(msg, d->private, d->msg_type, rec->header->from, &data);
-		}
+		data.data = rec->packet.data + sizeof(*rec->header);
+		data.length = rec->header->length;
+		d->fn(msg, d->private, d->msg_type, rec->header->from, &data);
 	}
 	rec->header->length = 0;
 }
@@ -272,34 +281,96 @@
 /*
   Register a dispatch function for a particular message type.
 */
-void messaging_register(struct messaging_context *msg, void *private,
-			uint32_t msg_type, 
-			void (*fn)(struct messaging_context *, void *, uint32_t, uint32_t, DATA_BLOB *))
+NTSTATUS messaging_register(struct messaging_context *msg, void *private,
+			    uint32_t msg_type, msg_callback_t fn)
 {
 	struct dispatch_fn *d;
 
-	d = talloc(msg, struct dispatch_fn);
+	/* possibly expand dispatch array */
+	if (msg_type >= msg->num_types) {
+		struct dispatch_fn **dp;
+		int i;
+		dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
+		NT_STATUS_HAVE_NO_MEMORY(dp);
+		msg->dispatch = dp;
+		for (i=msg->num_types;i<=msg_type;i++) {
+			msg->dispatch[i] = NULL;
+		}
+		msg->num_types = msg_type+1;
+	}
+
+
+	d = talloc(msg->dispatch, struct dispatch_fn);
+	NT_STATUS_HAVE_NO_MEMORY(d);
 	d->msg_type = msg_type;
 	d->private = private;
 	d->fn = fn;
-	DLIST_ADD(msg->dispatch, d);
+
+	DLIST_ADD(msg->dispatch[msg_type], d);
+
+	return NT_STATUS_OK;
 }
 
 /*
+  register a temporary message handler. The msg_type is allocated
+  above MSG_TMP_BASE
+*/
+NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private,
+				msg_callback_t fn, uint32_t *msg_type)
+{
+	struct dispatch_fn *d;
+	int id;
+
+	d = talloc_zero(msg->dispatch, struct dispatch_fn);
+	NT_STATUS_HAVE_NO_MEMORY(d);
+	d->private = private;
+	d->fn = fn;
+
+	id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX);
+	if (id == -1) {
+		talloc_free(d);
+		return NT_STATUS_TOO_MANY_CONTEXT_IDS;
+	}
+
+	d->msg_type = (uint32_t)id;
+	(*msg_type) = d->msg_type;
+
+	return NT_STATUS_OK;
+}
+
+/*
   De-register the function for a particular message type.
 */
 void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private)
 {
-	struct dispatch_fn *d, *next;
+	struct dispatch_fn *d, *list, *next;
 
-	for (d = msg->dispatch; d; d = next) {
+	if (msg_type >= msg->num_types) {
+		list = idr_find(msg->dispatch_tree, msg_type);
+	} else {
+		list = msg->dispatch[msg_type];
+	}
+
+	if (list == NULL) {
+		return;
+	}
+
+	for (d = list; d; d = next) {
 		next = d->next;
-		if (d->msg_type == msg_type && 
-		    d->private == private) {
-			DLIST_REMOVE(msg->dispatch, d);
+		if (d->private == private) {
+			DLIST_REMOVE(list, d);
 			talloc_free(d);
 		}
 	}	
+
+	/* the list base possibly changed */
+	if (list == NULL) {
+		if (msg_type >= msg->num_types) {
+			idr_remove(msg->dispatch_tree, msg_type);
+		} else {
+			msg->dispatch[msg_type] = NULL;
+		}
+	}
 }
 
 
@@ -397,7 +468,7 @@
 	struct socket_address *path;
 	char *dir;
 
-	msg = talloc(mem_ctx, struct messaging_context);
+	msg = talloc_zero(mem_ctx, struct messaging_context);
 	if (msg == NULL) {
 		return NULL;
 	}
@@ -411,15 +482,12 @@
 	mkdir(dir, 0700);
 	talloc_free(dir);
 
-	msg->base_path  = smbd_tmp_path(msg, "messaging");
-	msg->path       = messaging_path(msg, server_id);
-	msg->server_id  = server_id;
-	msg->dispatch   = NULL;
-	msg->pending    = NULL;
-	msg->idr        = idr_init(msg);
-	msg->irpc       = NULL;
-	msg->names      = NULL;
-	msg->start_time = timeval_current();
+	msg->base_path     = smbd_tmp_path(msg, "messaging");
+	msg->path          = messaging_path(msg, server_id);
+	msg->server_id     = server_id;
+	msg->idr           = idr_init(msg);
+	msg->dispatch_tree = idr_init(msg);
+	msg->start_time    = timeval_current();
 
 	status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
 	if (!NT_STATUS_IS_OK(status)) {

Modified: branches/SAMBA_4_0/source/lib/messaging/messaging.h
===================================================================
--- branches/SAMBA_4_0/source/lib/messaging/messaging.h	2006-04-12 04:42:40 UTC (rev 15048)
+++ branches/SAMBA_4_0/source/lib/messaging/messaging.h	2006-04-12 06:08:24 UTC (rev 15049)
@@ -34,4 +34,7 @@
 #define MSG_PVFS_NOTIFY		7
 #define MSG_NTVFS_OPLOCK_BREAK	8
 
+/* temporary messaging endpoints are allocated above this line */
+#define MSG_TMP_BASE		1000
+
 #endif

Modified: branches/SAMBA_4_0/source/torture/local/messaging.c
===================================================================
--- branches/SAMBA_4_0/source/torture/local/messaging.c	2006-04-12 04:42:40 UTC (rev 15048)
+++ branches/SAMBA_4_0/source/torture/local/messaging.c	2006-04-12 06:08:24 UTC (rev 15049)
@@ -25,13 +25,14 @@
 #include "lib/messaging/irpc.h"
 #include "torture/torture.h"
 
-enum {MY_PING=1000, MY_PONG, MY_EXIT};
 
+static uint32_t msg_pong;
+
 static void ping_message(struct messaging_context *msg, void *private, 
 			 uint32_t msg_type, uint32_t src, DATA_BLOB *data)
 {
 	NTSTATUS status;
-	status = messaging_send(msg, src, MY_PONG, data);
+	status = messaging_send(msg, src, msg_pong, data);
 	if (!NT_STATUS_IS_OK(status)) {
 		printf("pong failed - %s\n", nt_errstr(status));
 	}
@@ -64,6 +65,7 @@
 	BOOL ret = True;
 	struct timeval tv;
 	int timelimit = lp_parm_int(-1, "torture", "timelimit", 10);
+	uint32_t msg_ping, msg_exit;
 
 	lp_set_cmdline("lock dir", "lockdir.tmp");
 
@@ -77,8 +79,8 @@
 		return False;
 	}
 		
-	messaging_register(msg_server_ctx, NULL, MY_PING, ping_message);
-	messaging_register(msg_server_ctx, mem_ctx, MY_EXIT, exit_message);
+	messaging_register_tmp(msg_server_ctx, NULL, ping_message, &msg_ping);
+	messaging_register_tmp(msg_server_ctx, mem_ctx, exit_message, &msg_exit);
 
 	msg_client_ctx = messaging_init(mem_ctx, 2, ev);
 
@@ -87,7 +89,7 @@
 		return False;
 	}
 
-	messaging_register(msg_client_ctx, &pong_count, MY_PONG, pong_message);
+	messaging_register_tmp(msg_client_ctx, &pong_count, pong_message, &msg_pong);
 
 	tv = timeval_current();
 
@@ -99,8 +101,8 @@
 		data.data = discard_const_p(uint8_t, "testing");
 		data.length = strlen((const char *)data.data);
 
-		status1 = messaging_send(msg_client_ctx, 1, MY_PING, &data);
-		status2 = messaging_send(msg_client_ctx, 1, MY_PING, NULL);
+		status1 = messaging_send(msg_client_ctx, 1, msg_ping, &data);
+		status2 = messaging_send(msg_client_ctx, 1, msg_ping, NULL);
 
 		if (!NT_STATUS_IS_OK(status1)) {
 			printf("msg1 failed - %s\n", nt_errstr(status1));
@@ -126,7 +128,7 @@
 	}
 
 	printf("sending exit\n");
-	messaging_send(msg_client_ctx, 1, MY_EXIT, NULL);
+	messaging_send(msg_client_ctx, 1, msg_exit, NULL);
 
 	if (ping_count != pong_count) {
 		printf("ping test failed! received %d, sent %d\n", 



More information about the samba-cvs mailing list