[SCM] Samba Shared Repository - branch master updated

Volker Lendecke vlendec at samba.org
Thu Oct 6 00:30:03 UTC 2016


The branch, master has been updated
       via  d02909f s3: lib: messaging. Add function comments I needed to understand this code.
      from  eb75553 s3-printing: fix migrate printer code (bug 8618)

https://git.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit d02909f3e07bd78103367de8d74429af5e802020
Author: Jeremy Allison <jra at samba.org>
Date:   Wed Oct 5 10:46:13 2016 -0700

    s3: lib: messaging. Add function comments I needed to understand this code.
    
    Signed-off-by: Jeremy Allison <jra at samba.org>
    Reviewed-by: Volker Lendecke <vl at samba.org>
    
    Autobuild-User(master): Volker Lendecke <vl at samba.org>
    Autobuild-Date(master): Thu Oct  6 02:29:41 CEST 2016 on sn-devel-144

-----------------------------------------------------------------------

Summary of changes:
 source3/lib/messages_dgm.c | 125 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 125 insertions(+)


Changeset truncated at 500 lines:

diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 39b779b..6bdd589 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -141,6 +141,11 @@ static void close_fd_array(int *fds, size_t num_fds)
 	}
 }
 
+/*
+ * The idle handler can free the struct messaging_dgm_out *,
+ * if it's unused (qlen of zero) which closes the socket.
+ */
+
 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
 					   struct tevent_timer *te,
 					   struct timeval current_time,
@@ -158,6 +163,11 @@ static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
 	}
 }
 
+/*
+ * Setup the idle handler to fire afer 1 second if the
+ * queue is zero.
+ */
+
 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
 {
 	size_t qlen;
@@ -189,6 +199,11 @@ static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
 					   struct timeval current_time,
 					   void *private_data);
 
+/*
+ * Connect to an existing rendezvous point for another
+ * pid - wrapped inside a struct messaging_dgm_out *.
+ */
+
 static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
 				    struct messaging_dgm_context *ctx,
 				    pid_t pid, struct messaging_dgm_out **pout)
@@ -277,6 +292,12 @@ static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
 	return 0;
 }
 
+/*
+ * Find the struct messaging_dgm_out * to talk to pid.
+ * If we don't have one, create it. Set the timer to
+ * delete after 1 sec.
+ */
+
 static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
 				 struct messaging_dgm_out **pout)
 {
@@ -302,6 +323,13 @@ static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
 	return 0;
 }
 
+/*
+ * This function is called directly to send a message fragment
+ * when the outgoing queue is zero, and from a pthreadpool
+ * job thread when messages are being queued (qlen != 0).
+ * Make sure *ONLY* thread-safe functions are called within.
+ */
+
 static ssize_t messaging_dgm_sendmsg(int sock,
 				     const struct iovec *iov, int iovlen,
 				     const int *fds, size_t num_fds,
@@ -365,6 +393,13 @@ static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
 static void messaging_dgm_out_threaded_job(void *private_data);
 static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
 
+/*
+ * Push a message fragment onto a queue to be sent by a
+ * threadpool job. Makes copies of data/fd's to be sent.
+ * The running tevent_queue internally creates an immediate
+ * event to schedule the write.
+ */
+
 static struct tevent_req *messaging_dgm_out_queue_send(
 	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 	struct messaging_dgm_out *out,
@@ -467,6 +502,11 @@ static int messaging_dgm_out_queue_state_destructor(
 	return 0;
 }
 
+/*
+ * tevent_queue callback that schedules the pthreadpool to actually
+ * send the queued message fragment.
+ */
+
 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
 					   void *private_data)
 {
@@ -485,6 +525,11 @@ static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
 				req);
 }
 
+/*
+ * Wrapper function run by the pthread that calls
+ * messaging_dgm_sendmsg() to actually do the sendmsg().
+ */
+
 static void messaging_dgm_out_threaded_job(void *private_data)
 {
 	struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
@@ -498,6 +543,10 @@ static void messaging_dgm_out_threaded_job(void *private_data)
 					    state->fds, num_fds, &state->err);
 }
 
+/*
+ * Pickup the results of the pthread sendmsg().
+ */
+
 static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
 {
 	struct tevent_req *req = tevent_req_callback_data(
@@ -532,6 +581,14 @@ static int messaging_dgm_out_queue_recv(struct tevent_req *req)
 
 static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
 
+/*
+ * Core function to send a message fragment given a
+ * connected struct messaging_dgm_out * destination.
+ * If no current queue tries to send nonblocking
+ * directly. If not, queues the fragment (which makes
+ * a copy of it) and adds a 60-second timeout on the send.
+ */
+
 static int messaging_dgm_out_send_fragment(
 	struct tevent_context *ev, struct messaging_dgm_out *out,
 	const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
@@ -581,6 +638,11 @@ static int messaging_dgm_out_send_fragment(
 	return 0;
 }
 
+/*
+ * Pickup the result of the fragment send. Reset idle timer
+ * if queue empty.
+ */
+
 static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
 {
 	struct messaging_dgm_out *out = tevent_req_callback_data(
@@ -605,6 +667,33 @@ struct messaging_dgm_fragment_hdr {
 	int sock;
 };
 
+/*
+ * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
+ * size chunks and send it.
+ *
+ * Message fragments are prefixed by a 64-bit cookie that
+ * stays the same for all fragments. This allows the receiver
+ * to recognise fragments of the same message and re-assemble
+ * them on the other end.
+ *
+ * Note that this allows other message fragments from other
+ * senders to be interleaved in the receive read processing,
+ * the combination of the cookie and header info allows unique
+ * identification of the message from a specific sender in
+ * re-assembly.
+ *
+ * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
+ * then send a single message with cookie set to zero.
+ *
+ * Otherwise the message is fragmented into chunks and added
+ * to the sending queue. Any file descriptors are passed only
+ * in the last fragment.
+ *
+ * Finally the cookie is incremented (wrap over zero) to
+ * prepare for the next message sent to this channel.
+ *
+ */
+
 static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
 					     struct messaging_dgm_out *out,
 					     const struct iovec *iov,
@@ -837,6 +926,12 @@ static void messaging_dgm_read_handler(struct tevent_context *ev,
 				       uint16_t flags,
 				       void *private_data);
 
+/*
+ * Create the rendezvous point in the file system
+ * that other processes can use to send messages to
+ * this pid.
+ */
+
 int messaging_dgm_init(struct tevent_context *ev,
 		       uint64_t *punique,
 		       const char *socket_dir,
@@ -948,6 +1043,11 @@ fail_nomem:
 	return ENOMEM;
 }
 
+/*
+ * Remove the rendezvous point in the filesystem
+ * if we're the owner.
+ */
+
 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
 {
 	while (c->outsocks != NULL) {
@@ -1004,6 +1104,11 @@ static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
 			       uint8_t *msg, size_t msg_len,
 			       int *fds, size_t num_fds);
 
+/*
+ * Raw read callback handler - passes to messaging_dgm_recv()
+ * for fragment reassembly processing.
+ */
+
 static void messaging_dgm_read_handler(struct tevent_context *ev,
 				       struct tevent_fd *fde,
 				       uint16_t flags,
@@ -1078,6 +1183,12 @@ static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
 	return 0;
 }
 
+/*
+ * Deal with identification of fragmented messages and
+ * re-assembly into full messages sent, then calls the
+ * callback.
+ */
+
 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
 			       struct tevent_context *ev,
 			       uint8_t *buf, size_t buflen,
@@ -1387,6 +1498,20 @@ static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
 	return 0;
 }
 
+/*
+ * Reference counter for a struct tevent_fd messaging read event
+ * (with callback function) on a struct tevent_context registered
+ * on a messaging context.
+ *
+ * If we've already registered this struct tevent_context before
+ * (so already have a read event), just increase the reference count.
+ *
+ * Otherwise create a new struct tevent_fd messaging read event on the
+ * previously unseen struct tevent_context - this is what drives
+ * the message receive processing.
+ *
+ */
+
 struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
 	TALLOC_CTX *mem_ctx, struct tevent_context *ev)
 {


-- 
Samba Shared Repository



More information about the samba-cvs mailing list