[SCM] CTDB repository - branch libctdb updated - ctdb-1.0.114-91-g0ecfea2
Ronnie Sahlberg
sahlberg at samba.org
Tue May 11 22:10:38 MDT 2010
The branch, libctdb has been updated
via 0ecfea29e46dbb2c3892ddafddb82039c6416fe1 (commit)
from 7655aaba88d1c3cbc644097f906c112fce02f058 (commit)
http://gitweb.samba.org/?p=sahlberg/ctdb.git;a=shortlog;h=libctdb
- Log -----------------------------------------------------------------
commit 0ecfea29e46dbb2c3892ddafddb82039c6416fe1
Author: Ronnie Sahlberg <ronniesahlberg at gmail.com>
Date: Wed May 12 14:09:04 2010 +1000
ooops, add a file that was missing
-----------------------------------------------------------------------
Summary of changes:
libctdb/ctdb_client.c | 683 +++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 683 insertions(+), 0 deletions(-)
create mode 100644 libctdb/ctdb_client.c
Changeset truncated at 500 lines:
diff --git a/libctdb/ctdb_client.c b/libctdb/ctdb_client.c
new file mode 100644
index 0000000..c7ce138
--- /dev/null
+++ b/libctdb/ctdb_client.c
@@ -0,0 +1,683 @@
+/*
+ ctdb client code
+
+ Copyright (C) Andrew Tridgell 2007
+ Copyright (C) Ronnie Sahlberg 2007-2010
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "db_wrap.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/util/dlinklist.h"
+#include "lib/events/events.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "system/locale.h"
+#include <stdlib.h>
+#include "include/ctdb_protocol.h"
+#include "include/ctdb_private.h"
+#include "lib/util/dlinklist.h"
+
+
+/*
+ allocate a packet for use in client<->daemon communication
+ */
+struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
+ TALLOC_CTX *mem_ctx,
+ enum ctdb_operation operation,
+ size_t length, size_t slength,
+ const char *type)
+{
+ int size;
+ struct ctdb_req_header *hdr;
+
+ length = MAX(length, slength);
+ size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
+
+ hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
+ if (hdr == NULL) {
+ DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
+ operation, (unsigned)length));
+ return NULL;
+ }
+ talloc_set_name_const(hdr, type);
+ memset(hdr, 0, slength);
+ hdr->length = length;
+ hdr->operation = operation;
+ hdr->ctdb_magic = CTDB_MAGIC;
+ hdr->ctdb_version = CTDB_VERSION;
+ hdr->srcnode = ctdb->pnn;
+ if (ctdb->vnn_map) {
+ hdr->generation = ctdb->vnn_map->generation;
+ }
+
+ return hdr;
+}
+
+/*
+ local version of ctdb_call
+*/
+int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
+ struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
+ TDB_DATA *data, uint32_t caller)
+{
+ struct ctdb_call_info *c;
+ struct ctdb_registered_call *fn;
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+
+ c = talloc(ctdb, struct ctdb_call_info);
+ CTDB_NO_MEMORY(ctdb, c);
+
+ c->key = call->key;
+ c->call_data = &call->call_data;
+ c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
+ c->record_data.dsize = data->dsize;
+ CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
+ c->new_data = NULL;
+ c->reply_data = NULL;
+ c->status = 0;
+
+ for (fn=ctdb_db->calls;fn;fn=fn->next) {
+ if (fn->id == call->call_id) break;
+ }
+ if (fn == NULL) {
+ ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
+ talloc_free(c);
+ return -1;
+ }
+
+ if (fn->fn(c) != 0) {
+ ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
+ talloc_free(c);
+ return -1;
+ }
+
+ if (header->laccessor != caller) {
+ header->lacount = 0;
+ }
+ header->laccessor = caller;
+ header->lacount++;
+
+ /* we need to force the record to be written out if this was a remote access,
+ so that the lacount is updated */
+ if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
+ c->new_data = &c->record_data;
+ }
+
+ if (c->new_data) {
+ /* XXX check that we always have the lock here? */
+ if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
+ ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
+ talloc_free(c);
+ return -1;
+ }
+ }
+
+ if (c->reply_data) {
+ call->reply_data = *c->reply_data;
+
+ talloc_steal(call, call->reply_data.dptr);
+ talloc_set_name_const(call->reply_data.dptr, __location__);
+ } else {
+ call->reply_data.dptr = NULL;
+ call->reply_data.dsize = 0;
+ }
+ call->status = c->status;
+
+ talloc_free(c);
+
+ return 0;
+}
+
+
+/* async version of receive control reply */
+int ctdb_control_recv(struct ctdb_context *ctdb,
+ struct ctdb_client_control_state *state,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA *outdata, int32_t *status, char **errormsg)
+{
+ TALLOC_CTX *tmp_ctx;
+
+ if (status != NULL) {
+ *status = -1;
+ }
+ if (errormsg != NULL) {
+ *errormsg = NULL;
+ }
+
+ if (state == NULL) {
+ return -1;
+ }
+
+ /* prevent double free of state */
+ tmp_ctx = talloc_new(ctdb);
+ talloc_steal(tmp_ctx, state);
+
+ /* loop one event at a time until we either timeout or the control
+ completes.
+ */
+ while (state->state == CTDB_CONTROL_WAIT) {
+ event_loop_once(ctdb->ev);
+ }
+
+ if (state->state != CTDB_CONTROL_DONE) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ if (state->errormsg) {
+ DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
+ if (errormsg) {
+ (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
+ }
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ if (outdata) {
+ *outdata = state->outdata;
+ outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
+ }
+
+ if (status) {
+ *status = state->status;
+ }
+
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
+
+ talloc_free(tmp_ctx);
+ return 0;
+}
+
+
+/*
+ called when a control completes or timesout to invoke the callback
+ function the user provided
+*/
+static void invoke_control_callback(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
+{
+ struct ctdb_client_control_state *state;
+ TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+ int ret;
+
+ state = talloc_get_type(private_data, struct ctdb_client_control_state);
+ talloc_steal(tmp_ctx, state);
+
+ ret = ctdb_control_recv(state->ctdb, state, state,
+ NULL,
+ NULL,
+ NULL);
+
+ talloc_free(tmp_ctx);
+}
+
+/*
+ called when a CTDB_REPLY_CONTROL packet comes in in the client
+
+ This packet comes in response to a CTDB_REQ_CONTROL request packet. It
+ contains any reply data from the control
+*/
+static void ctdb_client_reply_control(struct ctdb_context *ctdb,
+ struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
+ struct ctdb_client_control_state *state;
+
+ state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
+ if (state == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
+ return;
+ }
+
+ if (hdr->reqid != state->reqid) {
+ /* we found a record but it was the wrong one */
+ DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
+ return;
+ }
+
+ state->outdata.dptr = c->data;
+ state->outdata.dsize = c->datalen;
+ state->status = c->status;
+ if (c->errorlen) {
+ state->errormsg = talloc_strndup(state,
+ (char *)&c->data[c->datalen],
+ c->errorlen);
+ }
+
+ /* state->outdata now uses resources from c so we dont want c
+ to just dissappear from under us while state is still alive
+ */
+ talloc_steal(state, c);
+
+ state->state = CTDB_CONTROL_DONE;
+
+ /* if we had a callback registered for this control, pull the response
+ and call the callback.
+ */
+ if (state->async.fn) {
+ event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
+ }
+}
+
+
+/*
+ called when a CTDB_REPLY_CALL packet comes in in the client
+
+ This packet comes in response to a CTDB_REQ_CALL request packet. It
+ contains any reply data from the call
+*/
+static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+ struct ctdb_client_call_state *state;
+
+ state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
+ if (state == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
+ return;
+ }
+
+ if (hdr->reqid != state->reqid) {
+ /* we found a record but it was the wrong one */
+ DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
+ return;
+ }
+
+ state->call->reply_data.dptr = c->data;
+ state->call->reply_data.dsize = c->datalen;
+ state->call->status = c->status;
+
+ talloc_steal(state, c);
+
+ state->state = CTDB_CALL_DONE;
+
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
+}
+
+/*
+ this is called in the client, when data comes in from the daemon
+ */
+static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
+{
+ struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
+ struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
+ TALLOC_CTX *tmp_ctx;
+
+ /* place the packet as a child of a tmp_ctx. We then use
+ talloc_free() below to free it. If any of the calls want
+ to keep it, then they will steal it somewhere else, and the
+ talloc_free() will be a no-op */
+ tmp_ctx = talloc_new(ctdb);
+ talloc_steal(tmp_ctx, hdr);
+
+ if (cnt == 0) {
+ DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
+ exit(0);
+ }
+
+ if (cnt < sizeof(*hdr)) {
+ DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
+ goto done;
+ }
+ if (cnt != hdr->length) {
+ ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
+ (unsigned)hdr->length, (unsigned)cnt);
+ goto done;
+ }
+
+ if (hdr->ctdb_magic != CTDB_MAGIC) {
+ ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
+ goto done;
+ }
+
+ if (hdr->ctdb_version != CTDB_VERSION) {
+ ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
+ goto done;
+ }
+
+ switch (hdr->operation) {
+ case CTDB_REPLY_CALL:
+ ctdb_client_reply_call(ctdb, hdr);
+ break;
+
+ case CTDB_REQ_MESSAGE:
+ ctdb_request_message(ctdb, hdr);
+ break;
+
+ case CTDB_REPLY_CONTROL:
+ ctdb_client_reply_control(ctdb, hdr);
+ break;
+
+ default:
+ DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
+ }
+
+done:
+ talloc_free(tmp_ctx);
+}
+
+
+/*
+ connect to a unix domain socket
+*/
+int ctdb_socket_connect(struct ctdb_context *ctdb)
+{
+ struct sockaddr_un addr;
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sun_family = AF_UNIX;
+ strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
+
+ ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (ctdb->daemon.sd == -1) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
+ return -1;
+ }
+
+ set_nonblocking(ctdb->daemon.sd);
+ set_close_on_exec(ctdb->daemon.sd);
+
+ if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+ close(ctdb->daemon.sd);
+ ctdb->daemon.sd = -1;
+ DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
+ return -1;
+ }
+
+ ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
+ CTDB_DS_ALIGNMENT,
+ ctdb_client_read_cb, ctdb);
+ return 0;
+}
+
+
+
+/*
+ destroy a ctdb_control in client
+*/
+int ctdb_control_destructor(struct ctdb_client_control_state *state)
+{
+ ctdb_reqid_remove(state->ctdb, state->reqid);
+ return 0;
+}
+
+
+/*
+ queue a packet for sending from client to daemon
+*/
+static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
+}
+
+
+/* time out handler for ctdb_control */
+static void control_timeout_func(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
+{
+ struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
+
+ DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
+ "dstnode:%u\n", state->reqid, state->c->opcode,
+ state->c->hdr.destnode));
+
+ state->state = CTDB_CONTROL_TIMEOUT;
+
+ /* if we had a callback registered for this control, pull the response
+ and call the callback.
+ */
+ if (state->async.fn) {
+ event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
+ }
+}
+
+/* async version of send control request */
+struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
+ uint32_t destnode, uint64_t srvid,
+ uint32_t opcode, uint32_t flags, TDB_DATA data,
+ TALLOC_CTX *mem_ctx,
+ struct timeval *timeout,
+ char **errormsg)
+{
+ struct ctdb_client_control_state *state;
+ size_t len;
+ struct ctdb_req_control *c;
+ int ret;
+
+ if (errormsg) {
+ *errormsg = NULL;
+ }
+
+ /* if the domain socket is not yet open, open it */
+ if (ctdb->daemon.sd==-1) {
+ ctdb_socket_connect(ctdb);
+ }
+
+ state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
+ CTDB_NO_MEMORY_NULL(ctdb, state);
+
+ state->ctdb = ctdb;
+ state->reqid = ctdb_reqid_new(ctdb, state);
+ state->state = CTDB_CONTROL_WAIT;
+ state->errormsg = NULL;
+
+ talloc_set_destructor(state, ctdb_control_destructor);
+
+ len = offsetof(struct ctdb_req_control, data) + data.dsize;
+ c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
+ len, struct ctdb_req_control);
+ state->c = c;
--
CTDB repository
More information about the samba-cvs
mailing list