22 #include <sys/param.h>
24 #include <sys/types.h>
41 ll_cluster_t *heartbeat_cluster = NULL;
44 convert_ha_field(xmlNode * parent,
void *msg_v,
int lpc)
47 const char *name = NULL;
48 const char *value = NULL;
50 HA_Message *msg = msg_v;
54 unsigned int used = 0;
55 char *uncompressed = NULL;
56 char *compressed = NULL;
57 int size = orig_len * 10;
62 name = msg->names[lpc];
63 type = cl_get_type(msg, name);
67 convert_ha_message(parent, msg->values[lpc], name);
71 convert_ha_message(parent, cl_get_struct(msg, name), name);
74 value = msg->values[lpc];
76 crm_trace(
"Converting %s/%d/%s", name, type, value[0] ==
'<' ?
"xml" :
"field");
78 if (value[0] !=
'<') {
86 crm_err(
"Conversion of field '%s' failed", name);
94 value = cl_get_binary(msg, name, &orig_len);
95 size = orig_len * 10 + 1;
97 if (orig_len < 3 || value[0] !=
'B' || value[1] !=
'Z' || value[2] !=
'h') {
98 if (strstr(name,
"uuid") == NULL) {
99 crm_err(
"Skipping non-bzip binary field: %s", name);
104 compressed = calloc(1, orig_len);
105 memcpy(compressed, value, orig_len);
107 crm_trace(
"Trying to decompress %d bytes", (
int)orig_len);
109 uncompressed = realloc_safe(uncompressed, size);
110 memset(uncompressed, 0, size);
116 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &used, compressed, orig_len, 1, 0);
118 if (rc == BZ_OUTBUFF_FULL) {
127 crm_err(
"Decompression of %s (%d bytes) into %d failed: %d",
128 name, (
int)orig_len, size, rc);
130 }
else if (used >= size) {
135 uncompressed[used] = 0;
151 convert_ha_message(xmlNode * parent, HA_Message * msg,
const char *field)
154 xmlNode *child = NULL;
155 const char *tag = NULL;
166 crm_debug(
"Creating intermediate parent %s between %s and %s", field,
167 crm_element_name(parent), tag);
171 if (parent == NULL) {
179 for (lpc = 0; lpc < msg->nfields; lpc++) {
180 convert_ha_field(child, msg, lpc);
187 add_ha_nocopy(HA_Message * parent, HA_Message * child,
const char *field)
189 int next = parent->nfields;
191 if (parent->nfields >= parent->nalloc && ha_msg_expand(parent) != HA_OK) {
192 crm_err(
"Parent expansion failed");
196 parent->names[next] = strdup(field);
197 parent->nlens[next] = strlen(field);
198 parent->values[next] = child;
199 parent->vlens[next] =
sizeof(HA_Message);
200 parent->types[next] = FT_UNCOMPRESS;
205 convert_xml_message_struct(HA_Message * parent, xmlNode * src_node,
const char *field)
207 xmlNode *child = NULL;
208 xmlNode *__crm_xml_iter = src_node->children;
209 xmlAttrPtr prop_iter = src_node->properties;
210 const char *name = NULL;
211 const char *value = NULL;
213 HA_Message *result = ha_msg_new(3);
215 ha_msg_add(result,
F_XML_TAGNAME, (
const char *)src_node->name);
217 while (prop_iter != NULL) {
218 name = (
const char *)prop_iter->name;
219 value = (
const char *)xmlGetProp(src_node, prop_iter->name);
220 prop_iter = prop_iter->next;
221 ha_msg_add(result, name, value);
224 while (__crm_xml_iter != NULL) {
225 child = __crm_xml_iter;
226 __crm_xml_iter = __crm_xml_iter->next;
227 convert_xml_message_struct(result, child, NULL);
230 if (parent == NULL) {
235 HA_Message *holder = ha_msg_new(3);
240 add_ha_nocopy(holder, result, (
const char *)src_node->name);
242 ha_msg_addstruct_compress(parent, field, holder);
246 add_ha_nocopy(parent, result, (
const char *)src_node->name);
252 convert_xml_child(HA_Message * msg, xmlNode * xml)
256 unsigned int len = 0;
259 char *compressed = NULL;
260 const char *name = NULL;
262 name = (
const char *)xml->name;
264 orig = strlen(buffer);
266 ha_msg_add(msg, name, buffer);
270 len = (orig * 1.1) + 600;
272 compressed = malloc(len);
276 crm_err(
"Compression failed: %d", rc);
278 convert_xml_message_struct(msg, xml, name);
284 crm_trace(
"Compression details: %d -> %d", orig, len);
285 ha_msg_addbin(msg, name, buffer, len);
291 unsigned int used = orig;
292 char *uncompressed = NULL;
294 crm_debug(
"Trying to decompress %d bytes", len);
295 uncompressed = calloc(1, orig);
296 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &used, compressed, len, 1, 0);
305 crm_debug(
"Original %s, decompressed %s", buffer, uncompressed);
312 convert_xml_message(xmlNode * xml)
314 xmlNode *child = NULL;
315 xmlAttrPtr pIter = NULL;
316 HA_Message *result = NULL;
318 result = ha_msg_new(3);
321 for (pIter = xml->properties; pIter != NULL; pIter = pIter->next) {
322 const char *p_name = (
const char *)pIter->name;
324 if (pIter->children) {
325 const char *p_value = (
const char *)pIter->children->content;
327 ha_msg_add(result, p_name, p_value);
330 for (child = __xml_first_child(xml); child != NULL; child = __xml_next(child)) {
331 convert_xml_child(result, child);
338 crm_is_heartbeat_peer_active(
const crm_node_t * node)
357 }
else if ((node->
processes & proc) == 0) {
365 crm_update_ccm_node(
const oc_ev_membership_t * oc,
int offset,
const char *state, uint64_t seq)
369 const char *uuid = NULL;
371 CRM_CHECK(oc->m_array[offset].node_uname != NULL,
return NULL);
377 oc->m_array[offset].node_born_on, seq, -1, 0,
378 uuid, oc->m_array[offset].node_uname, NULL, state);
391 const char *const_uname = heartbeat_cluster->llc_ops->get_mynodeid(heartbeat_cluster);
405 send_ha_message(ll_cluster_t * hb_conn, xmlNode * xml,
const char *node, gboolean force_ordered)
407 gboolean all_is_good = TRUE;
408 HA_Message *msg = convert_xml_message(xml);
411 crm_err(
"can't send NULL message");
414 }
else if (hb_conn == NULL) {
415 crm_err(
"No heartbeat connection specified");
418 }
else if (hb_conn->llc_ops->chan_is_connected(hb_conn) == FALSE) {
419 crm_err(
"Not connected to Heartbeat");
422 }
else if (node != NULL) {
423 char *host_lowercase = g_ascii_strdown(node, -1);
425 if (hb_conn->llc_ops->send_ordered_nodemsg(hb_conn, msg, host_lowercase) != HA_OK) {
429 free(host_lowercase);
431 }
else if (force_ordered) {
432 if (hb_conn->llc_ops->send_ordered_clustermsg(hb_conn, msg) != HA_OK) {
434 crm_err(
"Broadcast Send failed");
438 if (hb_conn->llc_ops->sendclustermsg(hb_conn, msg) != HA_OK) {
440 crm_err(
"Broadcast Send failed");
444 if (all_is_good == FALSE && hb_conn != NULL) {
445 IPC_Channel *ipc = NULL;
446 IPC_Queue *send_q = NULL;
448 if (hb_conn->llc_ops->chan_is_connected(hb_conn) != HA_OK) {
449 ipc = hb_conn->llc_ops->ipcchan(hb_conn);
453 send_q = ipc->send_queue;
455 if (send_q != NULL) {
456 CRM_CHECK(send_q->current_qlen < send_q->max_qlen,;
474 ha_msg_dispatch(ll_cluster_t * cluster_conn, gpointer user_data)
476 IPC_Channel *channel = NULL;
480 if (cluster_conn != NULL) {
481 channel = cluster_conn->llc_ops->ipcchan(cluster_conn);
484 CRM_CHECK(cluster_conn != NULL,
return FALSE);
485 CRM_CHECK(channel != NULL,
return FALSE);
487 if (channel != NULL && IPC_ISRCONN(channel)) {
489 if (cluster_conn->llc_ops->msgready(cluster_conn) == 0) {
494 msg = cluster_conn->llc_ops->readmsg(cluster_conn, 0);
499 const char *msg_type = ha_msg_value(msg,
F_TYPE) ?:
"[type not set]";
500 const char *msg_to_id = ha_msg_value(msg, F_TOID);
502 crm_err(
"Ignored incoming message. Please set_msg_callback on %s", msg_type);
503 }
else if (msg_to_id) {
506 crm_notice(
"Ignored incoming message %s=%s %s=%s, please set_msg_callback",
507 F_TOID, msg_to_id,
F_TYPE, msg_type);
515 if (channel == NULL || channel->ch_status != IPC_CONNECT) {
516 crm_info(
"Lost connection to heartbeat service.");
527 const char *const_uuid = NULL;
528 const char *const_uname = NULL;
531 if (cluster->hb_conn->llc_ops->signon(cluster->hb_conn,
crm_system_name) != HA_OK) {
532 crm_err(
"Cannot sign on with heartbeat: %s",
533 cluster->hb_conn->llc_ops->errmsg(cluster->hb_conn));
538 cluster->hb_conn->llc_ops->set_msg_callback(cluster->hb_conn,
crm_system_name,
539 cluster->hb_dispatch, cluster->hb_conn)) {
541 crm_err(
"Cannot set msg callback: %s", cluster->hb_conn->llc_ops->errmsg(cluster->hb_conn));
546 GLLclusterSource *(*g_main_add_cluster) (
int priority, ll_cluster_t * api,
547 gboolean can_recurse,
548 gboolean(*dispatch) (ll_cluster_t * source_data,
550 gpointer userdata, GDestroyNotify notify) =
553 (*g_main_add_cluster) (G_PRIORITY_HIGH, cluster->hb_conn,
554 FALSE, ha_msg_dispatch, cluster->hb_conn, cluster->
destroy);
558 const_uname = cluster->hb_conn->llc_ops->get_mynodeid(cluster->hb_conn);
559 CRM_CHECK(const_uname != NULL,
return FALSE);
564 CRM_CHECK(const_uuid != NULL,
return FALSE);
566 crm_info(
"Hostname: %s", const_uname);
569 cluster->
uname = strdup(const_uname);
570 cluster->
uuid = strdup(const_uuid);
576 ccm_have_quorum(oc_ed_t event)
578 if (event == OC_EV_MS_NEW_MEMBERSHIP || event == OC_EV_MS_PRIMARY_RESTORED) {
585 ccm_event_name(oc_ed_t event)
588 if (event == OC_EV_MS_NEW_MEMBERSHIP) {
589 return "NEW MEMBERSHIP";
591 }
else if (event == OC_EV_MS_NOT_PRIMARY) {
592 return "NOT PRIMARY";
594 }
else if (event == OC_EV_MS_PRIMARY_RESTORED) {
595 return "PRIMARY RESTORED";
597 }
else if (event == OC_EV_MS_EVICTED) {
600 }
else if (event == OC_EV_MS_INVALID) {
604 return "NO QUORUM MEMBERSHIP";
609 heartbeat_initialize_nodelist(
void *cluster, gboolean force_member, xmlNode * xml_parent)
611 const char *ha_node = NULL;
612 ll_cluster_t *conn = cluster;
620 crm_info(
"Requesting the list of configured nodes");
621 conn->llc_ops->init_nodewalk(conn);
624 xmlNode *node = NULL;
626 const char *ha_node_type = NULL;
627 const char *ha_node_uuid = NULL;
629 ha_node = conn->llc_ops->nextnode(conn);
630 if (ha_node == NULL) {
634 ha_node_type = conn->llc_ops->node_type(conn, ha_node);
636 crm_debug(
"Node %s: skipping '%s'", ha_node, ha_node_type);
643 if (ha_node_uuid == NULL) {
644 crm_warn(
"Node %s: no uuid found", ha_node);
648 crm_debug(
"Node: %s (uuid: %s)", ha_node, ha_node_uuid);
654 }
while (ha_node != NULL);
656 conn->llc_ops->end_nodewalk(conn);
#define CRM_CHECK(expr, failure_action)
void * find_library_function(void **handle, const char *lib, const char *fn, int fatal)
#define crm_notice(fmt, args...)
gboolean safe_str_neq(const char *a, const char *b)
void(* destroy)(gpointer)
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
#define CRM_LOG_ASSERT(expr)
xmlNode * string2xml(const char *input)
crm_node_t * crm_update_peer(const char *source, unsigned int id, uint64_t born, uint64_t seen, int32_t votes, uint32_t children, const char *uuid, const char *uname, const char *addr, const char *state)
#define crm_warn(fmt, args...)
#define crm_debug(fmt, args...)
#define crm_trace(fmt, args...)
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
xmlNode * add_node_copy(xmlNode *new_parent, xmlNode *xml_node)
xmlNode * create_xml_node(xmlNode *parent, const char *name)
#define crm_log_xml_warn(xml, text)
void free_xml(xmlNode *child)
#define DAEMON_RESPAWN_STOP
const char * crm_xml_add(xmlNode *node, const char *name, const char *value)
#define crm_err(fmt, args...)
#define CRM_BZ2_THRESHOLD
char * dump_xml_unformatted(xmlNode *msg)
Wrappers for and extensions to libqb IPC.
int add_node_nocopy(xmlNode *parent, const char *name, xmlNode *child)
#define crm_log_xml_trace(xml, text)
#define safe_str_eq(a, b)
#define crm_info(fmt, args...)
const char * crm_peer_uuid(crm_node_t *node)
enum crm_ais_msg_types type