25 #include <sys/utsname.h>
26 #include <sys/socket.h>
30 # include <corosync/confdb.h>
31 # include <corosync/corodefs.h>
32 # include <corosync/cpg.h>
33 # include <corosync/cfg.h>
37 # include <corosync/cmap.h>
42 cman_handle_t pcmk_cman_handle = NULL;
56 static bool valid_cman_name(
const char *name,
uint32_t nodeid)
65 crm_notice(
"Ignoring inferred name from cman: %s", fakename);
85 if(
id) *
id = local_id;
86 if(uname) *uname = strdup(local_uname);
94 iov.iov_base = &header;
95 iov.iov_len = header.size;
99 rc = coroipcc_msg_send_reply_receive(
ais_ipc_handle, &iov, 1, &answer,
sizeof(answer));
102 crm_err(
"Odd message: id=%d, size=%d, error=%d",
103 answer.header.id, answer.header.size, answer.header.error));
105 crm_err(
"Bad response id: %d", answer.header.id));
108 if ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20) {
110 crm_info(
"Peer overloaded: Re-sending message (Attempt %d of 20)", retries);
116 crm_err(
"Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc));
119 }
else if (answer.header.error != CS_OK) {
120 crm_err(
"Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc));
124 crm_info(
"Server details: id=%u uname=%s cname=%s", answer.id, answer.uname, answer.cname);
126 local_id = answer.id;
127 local_uname = strdup(answer.uname);
129 if(
id) *
id = local_id;
130 if(uname) *uname = strdup(local_uname);
140 char *buf = malloc(buf_len);
150 if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
152 crm_info(
"Peer overloaded or membership in flux:"
153 " Re-sending message (Attempt %d of 20)", retries);
158 rc = coroipcc_msg_send_reply_receive(
ais_ipc_handle, iov, 1, buf, buf_len);
160 }
while ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20);
164 crm_err(
"Odd message: id=%d, size=%d, class=%d, error=%d",
165 header->id, header->size,
class, header->error));
169 crm_err(
"Bad response id (%d) for request (%d)", header->id,
170 ais_msg->header.
id));
171 CRM_CHECK(header->error == CS_OK, rc = header->error);
174 crm_perror(LOG_ERR,
"Sending plugin message %d FAILED: %s (%d)",
175 ais_msg->
id, ais_error2text(rc), rc);
178 free(iov[0].iov_base);
182 return (rc == CS_OK);
188 crm_info(
"Disconnecting from Corosync");
203 if (pcmk_cman_handle) {
205 if (cman_stop_notification(pcmk_cman_handle) >= 0) {
207 cman_finish(pcmk_cman_handle);
225 xmlNode *member = NULL;
226 const char *value = NULL;
231 crm_err(
"Invalid membership update: %s", msg->
data);
246 crm_notice(
"Membership %s: quorum %s", value, quorate ?
"acquired" :
"lost");
250 crm_info(
"Membership %s: quorum %s", value, quorate ?
"retained" :
"still lost");
253 for (member = __xml_first_child(xml); member != NULL; member = __xml_next(member)) {
271 crm_update_peer(__FUNCTION__,
id, born, seen, votes, procs, uname, uname, addr, state);
278 plugin_default_deliver_message(cpg_handle_t handle,
279 const struct cpg_name *groupName,
283 const char *from = NULL;
299 if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
303 crm_perror(LOG_ERR,
"Receiving message body failed: (%d) %s", rc, ais_error2text(rc));
306 if (buffer == NULL) {
314 if (cluster && cluster->cpg.cpg_deliver_fn) {
315 cluster->cpg.cpg_deliver_fn(0, NULL, 0, 0, buffer, 0);
318 plugin_default_deliver_message(0, NULL, 0, 0, buffer, 0);
329 plugin_destroy(gpointer user_data)
331 crm_err(
"AIS connection terminated");
339 pcmk_cman_dispatch(gpointer user_data)
341 int rc = cman_dispatch(pcmk_cman_handle, CMAN_DISPATCH_ALL);
344 crm_err(
"Connection to cman failed: %d", rc);
345 pcmk_cman_handle = 0;
351 # define MAX_NODES 256
354 cman_event_callback(cman_handle_t handle,
void *privdata,
int reason,
int arg)
356 int rc = 0, lpc = 0, node_count = 0;
358 cman_cluster_t cluster;
359 static cman_node_t cman_nodes[MAX_NODES];
361 gboolean(*dispatch) (
unsigned long long, gboolean) = privdata;
364 case CMAN_REASON_STATECHANGE:
366 memset(&cluster, 0,
sizeof(cluster));
367 rc = cman_get_cluster(pcmk_cman_handle, &cluster);
369 crm_err(
"Couldn't query cman cluster details: %d %d", rc, errno);
380 arg ?
"retained" :
"still lost");
383 memset(cman_nodes, 0, MAX_NODES *
sizeof(cman_node_t));
384 rc = cman_get_nodes(pcmk_cman_handle, MAX_NODES, &node_count, cman_nodes);
386 crm_err(
"Couldn't query cman node list: %d %d", rc, errno);
390 for (lpc = 0; lpc < node_count; lpc++) {
392 const char *name = NULL;
394 if (cman_nodes[lpc].cn_nodeid == 0) {
400 if(valid_cman_name(cman_nodes[lpc].cn_name, cman_nodes[lpc].cn_nodeid)) {
401 name = cman_nodes[lpc].cn_name;
405 if(cman_nodes[lpc].cn_member) {
408 }
else if(peer->
state) {
412 crm_info(
"State of node %s[%u] is still unknown", peer->
uname, peer->
id);
421 case CMAN_REASON_TRY_SHUTDOWN:
423 crm_notice(
"CMAN wants to shut down: %s", arg ?
"forced" :
"optional");
424 cman_replyto_shutdown(pcmk_cman_handle, 0);
427 case CMAN_REASON_CONFIG_UPDATE:
438 int rc = -1, fd = -1;
439 cman_cluster_t cluster;
446 crm_info(
"Configuring Pacemaker to obtain quorum from cman");
448 memset(&cluster, 0,
sizeof(cluster));
450 pcmk_cman_handle = cman_init(
dispatch);
451 if (pcmk_cman_handle == NULL || cman_is_active(pcmk_cman_handle) == FALSE) {
452 crm_err(
"Couldn't connect to cman");
456 rc = cman_start_notification(pcmk_cman_handle, cman_event_callback);
458 crm_err(
"Couldn't register for cman notifications: %d %d", rc, errno);
463 cman_event_callback(pcmk_cman_handle,
dispatch, CMAN_REASON_STATECHANGE,
464 cman_is_quorate(pcmk_cman_handle));
466 fd = cman_get_fd(pcmk_cman_handle);
472 cman_finish(pcmk_cman_handle);
476 crm_err(
"cman qorum is not supported in this build");
482 # ifdef SUPPORT_COROSYNC
488 crm_err(
"The Corosync quorum API is not supported in this build");
499 const char *name = NULL;
508 crm_info(
"Creating connection to our Corosync plugin");
515 crm_info(
"Connection to our Corosync plugin (%d) failed: %s (%d)",
520 crm_err(
"No context created, but connection reported 'ok'");
525 ais_error2text(rc), rc);
532 if (ais_fd_callbacks.
destroy == NULL) {
533 ais_fd_callbacks.
destroy = plugin_destroy;
537 crm_info(
"AIS connection established");
547 plugin_get_details(NULL, &(cluster->
uname));
549 crm_crit(
"Node name mismatch! Corosync supplied %s but our lookup returned %s",
550 cluster->
uname, name);
552 (
"Node name mismatches usually occur when assigned automatically by DHCP servers");
564 pcmk_mcp_dispatch(
const char *buffer, ssize_t length, gpointer userdata)
569 xmlNode *node = NULL;
571 for (node = __xml_first_child(msg); node != NULL; node = __xml_next(node)) {
593 pcmk_mcp_destroy(gpointer user_data)
595 void (*callback) (gpointer
data) = user_data;
609 .destroy = pcmk_mcp_destroy
612 while (retries < 5) {
622 cluster->
destroy, &mcp_callbacks);
629 case CS_ERR_TRY_AGAIN:
630 case CS_ERR_QUEUE_FULL:
638 crm_err(
"Retry count exceeded: %d", retries);
657 cman = cman_init(NULL);
658 if (cman != NULL && cman_is_active(cman)) {
660 memset(&us, 0,
sizeof(cman_node_t));
661 cman_get_node(cman, nodeid, &us);
662 if(valid_cman_name(us.cn_name, nodeid)) {
663 name = strdup(us.cn_name);
664 crm_info(
"Using CMAN node name %s for %u", name, nodeid);
671 crm_debug(
"Unable to get node name for nodeid %u", nodeid);
689 if (init_cs_connection_classic(cluster) == FALSE) {
699 crm_info(
"Could not find an active corosync based cluster");
711 if(cluster->
nodeid == 0) {
712 crm_err(
"Could not establish local nodeid");
717 if(cluster->
uname == NULL) {
718 crm_err(
"Could not establish local node name");
732 gboolean sane = TRUE;
736 if (sane && msg->header.
size == 0) {
741 if (sane && msg->header.error != CS_OK) {
742 crm_warn(
"Message header contains an error: %d", msg->header.error);
753 crm_warn(
"Message with no payload");
758 int str_size = strlen(data) + 1;
763 crm_warn(
"Message payload is corrupted: expected %d bytes, got %d",
766 for (lpc = (str_size - 10); lpc < msg->
size; lpc++) {
770 crm_debug(
"bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
776 crm_err(
"Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
777 msg->
id, ais_dest(&(msg->
host)), msg_type2text(dest),
783 (
"Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
784 msg->
id, ais_dest(&(msg->
host)), msg_type2text(dest), ais_dest(&(msg->
sender)),
795 hdb_handle_t object_handle,
const char *key,
char **value,
const char *fallback)
798 char *env_key = NULL;
799 const char *env_value = NULL;
807 if (object_handle > 0) {
808 if (CS_OK == confdb_key_get(config, object_handle, key, strlen(key), &buffer, &len)) {
809 *value = strdup(buffer);
814 crm_info(
"Found '%s' for option: %s", *value, key);
819 env_value = getenv(env_key);
823 crm_info(
"Found '%s' in ENV for option: %s", *value, key);
824 *value = strdup(env_value);
829 crm_info(
"Defaulting to '%s' for option: %s", fallback, key);
830 *value = strdup(fallback);
833 crm_info(
"No default for option: %s", key);
839 static confdb_handle_t
842 cs_error_t rc = CS_OK;
843 confdb_handle_t local_handle = OBJECT_PARENT_HANDLE;
845 rc = confdb_object_find_start(config, local_handle);
849 crm_err(
"Couldn't create search context: %d", rc);
855 config_find_next(confdb_handle_t config,
const char *name, confdb_handle_t top_handle)
857 cs_error_t rc = CS_OK;
858 hdb_handle_t local_handle = 0;
860 if (top_handle == 0) {
861 crm_err(
"Couldn't search for %s: no valid context", name);
865 crm_trace(
"Searching for %s in " HDB_X_FORMAT, name, top_handle);
866 rc = confdb_object_find(config, top_handle, name, strlen(name), &local_handle);
868 crm_info(
"No additional configuration supplied for: %s", name);
871 crm_info(
"Processing additional %s options...", name);
879 confdb_handle_t config;
884 confdb_handle_t top_handle = 0;
885 hdb_handle_t local_handle = 0;
886 static confdb_callbacks_t callbacks = { };
888 rc = confdb_initialize(&config, &callbacks);
890 crm_debug(
"Could not initialize Cluster Configuration Database API instance error %d", rc);
896 while (local_handle) {
902 crm_trace(
"Found Pacemaker plugin version: %s", value);
921 confdb_finalize(config);
924 (
"Corosync is running, but Pacemaker could not find the CMAN or Pacemaker plugin loaded");
952 crm_debug(
"%s: unknown process list, assuming active for now", node->
uname);
bool send_plugin_text(int class, struct iovec *iov)
enum crm_ais_msg_types type
#define CRM_CHECK(expr, failure_action)
#define crm_notice(fmt, args...)
#define crm_crit(fmt, args...)
gboolean safe_str_neq(const char *a, const char *b)
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
hdb_handle_t config_find_init(struct corosync_api_v1 *config, char *name)
char * get_corosync_uuid(crm_node_t *peer)
void terminate_cs_connection(crm_cluster_t *cluster)
const char * get_local_node_name(void)
void(* destroy)(gpointer)
long long crm_int_helper(const char *text, char **end_text)
struct mainloop_io_s mainloop_io_t
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
int(* dispatch)(gpointer userdata)
char * get_node_name(uint32_t nodeid)
char * strerror(int errnum)
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
#define AIS_IPC_MESSAGE_SIZE
gboolean cluster_connect_quorum(gboolean(*dispatch)(unsigned long long, gboolean), void(*destroy)(gpointer))
Wrappers for and extensions to glib mainloop.
xmlNode * string2xml(const char *input)
crm_node_t * crm_update_peer(const char *source, unsigned int id, uint64_t born, uint64_t seen, int32_t votes, uint32_t children, const char *uuid, const char *uname, const char *addr, const char *state)
gboolean init_cs_connection(crm_cluster_t *cluster)
hdb_handle_t ais_ipc_handle
gboolean crm_is_corosync_peer_active(const crm_node_t *node)
int plugin_dispatch(gpointer user_data)
void plugin_handle_membership(AIS_Message *msg)
void cluster_disconnect_cpg(crm_cluster_t *cluster)
#define crm_warn(fmt, args...)
#define crm_debug(fmt, args...)
void(* destroy)(gpointer userdata)
int get_config_opt(struct corosync_api_v1 *config, hdb_handle_t object_service_handle, char *key, char **value, const char *fallback)
#define crm_trace(fmt, args...)
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
#define CRM_MESSAGE_IPC_ACK
gboolean send_cluster_text(int class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
xmlNode * create_xml_node(xmlNode *parent, const char *name)
struct crm_ais_msg_s AIS_Message
int crm_element_value_int(xmlNode *data, const char *name, int *dest)
const char * crm_element_value(xmlNode *data, const char *name)
#define ais_data_len(msg)
struct qb_ipc_response_header cs_ipc_header_response_t
enum cluster_type_e find_corosync_variant(void)
unsigned long long crm_peer_seq
gboolean ais_membership_force
gboolean is_cman_cluster(void)
void free_xml(xmlNode *child)
gboolean crm_str_eq(const char *a, const char *b, gboolean use_case)
const char * name_for_cluster_type(enum cluster_type_e type)
int set_cluster_type(enum cluster_type_e type)
#define DAEMON_RESPAWN_STOP
gboolean init_cs_connection_once(crm_cluster_t *cluster)
hdb_handle_t config_find_next(struct corosync_api_v1 *config, char *name, hdb_handle_t top_handle)
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, int membership)
Update a node's state and membership information.
#define crm_log_xml_err(xml, text)
#define crm_perror(level, fmt, args...)
Log a system error message.
uint32_t get_local_nodeid(cpg_handle_t handle)
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
#define crm_err(fmt, args...)
#define G_PRIORITY_MEDIUM
int crm_ipc_send(crm_ipc_t *client, xmlNode *message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode **reply)
Wrappers for and extensions to libqb IPC.
gboolean crm_is_true(const char *s)
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
char * crm_concat(const char *prefix, const char *suffix, char join)
gboolean init_cman_connection(gboolean(*dispatch)(unsigned long long, gboolean), void(*destroy)(gpointer))
char * crm_itoa(int an_int)
#define safe_str_eq(a, b)
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
#define crm_info(fmt, args...)
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
gboolean is_classic_ais_cluster(void)
enum crm_ais_msg_types type
enum cluster_type_e get_cluster_type(void)