This source file includes following definitions.
- convert_ha_field
- convert_ha_message
- add_ha_nocopy
- convert_xml_message_struct
- convert_xml_child
- convert_xml_message
- crm_is_heartbeat_peer_active
- crm_update_ccm_node
- send_ha_message
- ha_msg_dispatch
- register_heartbeat_conn
- ccm_have_quorum
- ccm_event_name
- heartbeat_initialize_nodelist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include <crm_internal.h>
20 #include <dlfcn.h>
21
22 #include <sys/param.h>
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26 #include <string.h>
27 #include <stdlib.h>
28 #include <time.h>
29
30 #include <crm/crm.h>
31 #include <crm/msg_xml.h>
32
33 #include <crm/common/ipc.h>
34 #include <crm/cluster/internal.h>
35
36 #if HAVE_BZLIB_H
37 # include <bzlib.h>
38 #endif
39
40 #if SUPPORT_HEARTBEAT
41 ll_cluster_t *heartbeat_cluster = NULL;
42
43 static void
44 convert_ha_field(xmlNode * parent, void *msg_v, int lpc)
45 {
46 int type = 0;
47 const char *name = NULL;
48 const char *value = NULL;
49 xmlNode *xml = NULL;
50 HA_Message *msg = msg_v;
51
52 int rc = BZ_OK;
53 size_t orig_len = 0;
54 unsigned int used = 0;
55 char *uncompressed = NULL;
56 char *compressed = NULL;
57 int size = orig_len * 10;
58
59 CRM_CHECK(parent != NULL, return);
60 CRM_CHECK(msg != NULL, return);
61
62 name = msg->names[lpc];
63 type = cl_get_type(msg, name);
64
65 switch (type) {
66 case FT_STRUCT:
67 convert_ha_message(parent, msg->values[lpc], name);
68 break;
69 case FT_COMPRESS:
70 case FT_UNCOMPRESS:
71 convert_ha_message(parent, cl_get_struct(msg, name), name);
72 break;
73 case FT_STRING:
74 value = msg->values[lpc];
75 CRM_CHECK(value != NULL, return);
76 crm_trace("Converting %s/%d/%s", name, type, value[0] == '<' ? "xml" : "field");
77
78 if (value[0] != '<') {
79 crm_xml_add(parent, name, value);
80 break;
81 }
82
83
84 xml = string2xml(value);
85 if (xml == NULL) {
86 crm_err("Conversion of field '%s' failed", name);
87 return;
88 }
89
90 add_node_nocopy(parent, NULL, xml);
91 break;
92
93 case FT_BINARY:
94 value = cl_get_binary(msg, name, &orig_len);
95 size = orig_len * 10 + 1;
96
97 if (orig_len < 3 || value[0] != 'B' || value[1] != 'Z' || value[2] != 'h') {
98 if (strstr(name, "uuid") == NULL) {
99 crm_err("Skipping non-bzip binary field: %s", name);
100 }
101 return;
102 }
103
104 compressed = calloc(1, orig_len);
105 memcpy(compressed, value, orig_len);
106
107 crm_trace("Trying to decompress %d bytes", (int)orig_len);
108 retry:
109 uncompressed = realloc_safe(uncompressed, size);
110 memset(uncompressed, 0, size);
111 used = size - 1;
112
113
114
115
116 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &used, compressed, orig_len, 1, 0);
117
118 if (rc == BZ_OUTBUFF_FULL) {
119 size = size * 2;
120
121 if (size > 0) {
122 goto retry;
123 }
124 }
125
126 if (rc != BZ_OK) {
127 crm_err("Decompression of %s (%d bytes) into %d failed: %d",
128 name, (int)orig_len, size, rc);
129
130 } else if (used >= size) {
131 CRM_ASSERT(used < size);
132
133 } else {
134 CRM_LOG_ASSERT(uncompressed[used] == 0);
135 uncompressed[used] = 0;
136 xml = string2xml(uncompressed);
137 }
138
139 if (xml != NULL) {
140 add_node_copy(parent, xml);
141 free_xml(xml);
142 }
143
144 free(uncompressed);
145 free(compressed);
146 break;
147 }
148 }
149
150 xmlNode *
151 convert_ha_message(xmlNode * parent, HA_Message * msg, const char *field)
152 {
153 int lpc = 0;
154 xmlNode *child = NULL;
155 const char *tag = NULL;
156
157 CRM_CHECK(msg != NULL, crm_err("Empty message for %s", field);
158 return parent);
159
160 tag = cl_get_string(msg, F_XML_TAGNAME);
161 if (tag == NULL) {
162 tag = field;
163
164 } else if (parent && safe_str_neq(field, tag)) {
165
166 crm_debug("Creating intermediate parent %s between %s and %s", field,
167 crm_element_name(parent), tag);
168 parent = create_xml_node(parent, field);
169 }
170
171 if (parent == NULL) {
172 parent = create_xml_node(NULL, tag);
173 child = parent;
174
175 } else {
176 child = create_xml_node(parent, tag);
177 }
178
179 for (lpc = 0; lpc < msg->nfields; lpc++) {
180 convert_ha_field(child, msg, lpc);
181 }
182
183 return parent;
184 }
185
186 static void
187 add_ha_nocopy(HA_Message * parent, HA_Message * child, const char *field)
188 {
189 int next = parent->nfields;
190
191 if (parent->nfields >= parent->nalloc && ha_msg_expand(parent) != HA_OK) {
192 crm_err("Parent expansion failed");
193 return;
194 }
195
196 parent->names[next] = strdup(field);
197 parent->nlens[next] = strlen(field);
198 parent->values[next] = child;
199 parent->vlens[next] = sizeof(HA_Message);
200 parent->types[next] = FT_UNCOMPRESS;
201 parent->nfields++;
202 }
203
204 static HA_Message *
205 convert_xml_message_struct(HA_Message * parent, xmlNode * src_node, const char *field)
206 {
207 xmlNode *child = NULL;
208 xmlNode *__crm_xml_iter = src_node->children;
209 xmlAttrPtr prop_iter = src_node->properties;
210 const char *name = NULL;
211 const char *value = NULL;
212
213 HA_Message *result = ha_msg_new(3);
214
215 ha_msg_add(result, F_XML_TAGNAME, (const char *)src_node->name);
216
217 while (prop_iter != NULL) {
218 name = (const char *)prop_iter->name;
219 value = (const char *)xmlGetProp(src_node, prop_iter->name);
220 prop_iter = prop_iter->next;
221 ha_msg_add(result, name, value);
222 }
223
224 while (__crm_xml_iter != NULL) {
225 child = __crm_xml_iter;
226 __crm_xml_iter = __crm_xml_iter->next;
227 convert_xml_message_struct(result, child, NULL);
228 }
229
230 if (parent == NULL) {
231 return result;
232 }
233
234 if (field) {
235 HA_Message *holder = ha_msg_new(3);
236
237 CRM_ASSERT(holder != NULL);
238
239 ha_msg_add(holder, F_XML_TAGNAME, field);
240 add_ha_nocopy(holder, result, (const char *)src_node->name);
241
242 ha_msg_addstruct_compress(parent, field, holder);
243 ha_msg_del(holder);
244
245 } else {
246 add_ha_nocopy(parent, result, (const char *)src_node->name);
247 }
248 return result;
249 }
250
251 static void
252 convert_xml_child(HA_Message * msg, xmlNode * xml)
253 {
254 int orig = 0;
255 int rc = BZ_OK;
256 unsigned int len = 0;
257
258 char *buffer = NULL;
259 char *compressed = NULL;
260 const char *name = NULL;
261
262 name = (const char *)xml->name;
263 buffer = dump_xml_unformatted(xml);
264 orig = strlen(buffer);
265 if (orig < CRM_BZ2_THRESHOLD) {
266 ha_msg_add(msg, name, buffer);
267 goto done;
268 }
269
270 len = (orig * 1.1) + 600;
271
272 compressed = malloc(len);
273 rc = BZ2_bzBuffToBuffCompress(compressed, &len, buffer, orig, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK);
274
275 if (rc != BZ_OK) {
276 crm_err("Compression failed: %d", rc);
277 free(compressed);
278 convert_xml_message_struct(msg, xml, name);
279 goto done;
280 }
281
282 free(buffer);
283 buffer = compressed;
284 crm_trace("Compression details: %d -> %d", orig, len);
285 ha_msg_addbin(msg, name, buffer, len);
286 done:
287 free(buffer);
288
289 # if 0
290 {
291 unsigned int used = orig;
292 char *uncompressed = NULL;
293
294 crm_debug("Trying to decompress %d bytes", len);
295 uncompressed = calloc(1, orig);
296 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &used, compressed, len, 1, 0);
297 CRM_CHECK(rc == BZ_OK,;
298 );
299 CRM_CHECK(used == orig,;
300 );
301 crm_debug("rc=%d, used=%d", rc, used);
302 if (rc != BZ_OK) {
303 crm_exit(DAEMON_RESPAWN_STOP);
304 }
305 crm_debug("Original %s, decompressed %s", buffer, uncompressed);
306 free(uncompressed);
307 }
308 # endif
309 }
310
311 static HA_Message *
312 convert_xml_message(xmlNode * xml)
313 {
314 xmlNode *child = NULL;
315 xmlAttrPtr pIter = NULL;
316 HA_Message *result = NULL;
317
318 result = ha_msg_new(3);
319 ha_msg_add(result, F_XML_TAGNAME, (const char *)xml->name);
320
321 for (pIter = xml->properties; pIter != NULL; pIter = pIter->next) {
322 const char *p_name = (const char *)pIter->name;
323
324 if (pIter->children) {
325 const char *p_value = (const char *)pIter->children->content;
326
327 ha_msg_add(result, p_name, p_value);
328 }
329 }
330 for (child = __xml_first_child(xml); child != NULL; child = __xml_next(child)) {
331 convert_xml_child(result, child);
332 }
333
334 return result;
335 }
336
337 gboolean
338 crm_is_heartbeat_peer_active(const crm_node_t * node)
339 {
340 enum crm_proc_flag proc = text2proc(crm_system_name);
341
342 if (node == NULL) {
343 crm_trace("NULL");
344 return FALSE;
345
346 } else if (safe_str_neq(node->state, CRM_NODE_MEMBER)) {
347 crm_trace("%s: state=%s", node->uname, node->state);
348 return FALSE;
349
350 } else if ((node->processes & crm_proc_heartbeat) == 0) {
351 crm_trace("%s: processes=%.16x", node->uname, node->processes);
352 return FALSE;
353
354 } else if (proc == crm_proc_none) {
355 return TRUE;
356
357 } else if ((node->processes & proc) == 0) {
358 crm_trace("%s: proc %.16x not in %.16x", node->uname, proc, node->processes);
359 return FALSE;
360 }
361 return TRUE;
362 }
363
364 crm_node_t *
365 crm_update_ccm_node(const oc_ev_membership_t * oc, int offset, const char *state, uint64_t seq)
366 {
367 enum crm_proc_flag this_proc = text2proc(crm_system_name);
368 crm_node_t *peer = NULL;
369 const char *uuid = NULL;
370
371 CRM_CHECK(oc->m_array[offset].node_uname != NULL, return NULL);
372
373 peer = crm_get_peer(0, oc->m_array[offset].node_uname);
374 uuid = crm_peer_uuid(peer);
375
376 peer = crm_update_peer(__FUNCTION__, oc->m_array[offset].node_id,
377 oc->m_array[offset].node_born_on, seq, -1, 0,
378 uuid, oc->m_array[offset].node_uname, NULL, state);
379 if (peer == NULL) {
380 return NULL;
381 }
382
383 if (safe_str_eq(CRM_NODE_MEMBER, state)) {
384
385
386
387
388
389
390 enum crm_proc_flag flags = crm_proc_heartbeat;
391 const char *const_uname = heartbeat_cluster->llc_ops->get_mynodeid(heartbeat_cluster);
392 if (safe_str_eq(const_uname, peer->uname)) {
393 flags |= this_proc;
394 }
395 peer = crm_update_peer_proc(__FUNCTION__, peer, flags, ONLINESTATUS);
396 } else {
397
398
399 peer = crm_update_peer_proc(__FUNCTION__, peer, this_proc, OFFLINESTATUS);
400 }
401 return peer;
402 }
403
404 gboolean
405 send_ha_message(ll_cluster_t * hb_conn, xmlNode * xml, const char *node, gboolean force_ordered)
406 {
407 gboolean all_is_good = TRUE;
408 HA_Message *msg = convert_xml_message(xml);
409
410 if (msg == NULL) {
411 crm_err("can't send NULL message");
412 all_is_good = FALSE;
413
414 } else if (hb_conn == NULL) {
415 crm_err("No heartbeat connection specified");
416 all_is_good = FALSE;
417
418 } else if (hb_conn->llc_ops->chan_is_connected(hb_conn) == FALSE) {
419 crm_err("Not connected to Heartbeat");
420 all_is_good = FALSE;
421
422 } else if (node != NULL) {
423 char *host_lowercase = g_ascii_strdown(node, -1);
424
425 if (hb_conn->llc_ops->send_ordered_nodemsg(hb_conn, msg, host_lowercase) != HA_OK) {
426 all_is_good = FALSE;
427 crm_err("Send failed");
428 }
429 free(host_lowercase);
430
431 } else if (force_ordered) {
432 if (hb_conn->llc_ops->send_ordered_clustermsg(hb_conn, msg) != HA_OK) {
433 all_is_good = FALSE;
434 crm_err("Broadcast Send failed");
435 }
436
437 } else {
438 if (hb_conn->llc_ops->sendclustermsg(hb_conn, msg) != HA_OK) {
439 all_is_good = FALSE;
440 crm_err("Broadcast Send failed");
441 }
442 }
443
444 if (all_is_good == FALSE && hb_conn != NULL) {
445 IPC_Channel *ipc = NULL;
446 IPC_Queue *send_q = NULL;
447
448 if (hb_conn->llc_ops->chan_is_connected(hb_conn) != HA_OK) {
449 ipc = hb_conn->llc_ops->ipcchan(hb_conn);
450 }
451 if (ipc != NULL) {
452
453 send_q = ipc->send_queue;
454 }
455 if (send_q != NULL) {
456 CRM_CHECK(send_q->current_qlen < send_q->max_qlen,;
457 );
458 }
459 }
460
461 if (all_is_good) {
462 crm_log_xml_trace(xml, "outbound");
463 } else {
464 crm_log_xml_warn(xml, "outbound");
465 }
466
467 if (msg != NULL) {
468 ha_msg_del(msg);
469 }
470 return all_is_good;
471 }
472
473 gboolean
474 ha_msg_dispatch(ll_cluster_t * cluster_conn, gpointer user_data)
475 {
476 IPC_Channel *channel = NULL;
477
478 crm_trace("Invoked");
479
480 if (cluster_conn != NULL) {
481 channel = cluster_conn->llc_ops->ipcchan(cluster_conn);
482 }
483
484 CRM_CHECK(cluster_conn != NULL, return FALSE);
485 CRM_CHECK(channel != NULL, return FALSE);
486
487 if (channel != NULL && IPC_ISRCONN(channel)) {
488 struct ha_msg *msg;
489 if (cluster_conn->llc_ops->msgready(cluster_conn) == 0) {
490 crm_trace("no message ready yet");
491 }
492
493
494 msg = cluster_conn->llc_ops->readmsg(cluster_conn, 0);
495 if (msg) {
496
497
498
499 const char *msg_type = ha_msg_value(msg, F_TYPE) ?: "[type not set]";
500 const char *msg_to_id = ha_msg_value(msg, F_TOID);
501 if (safe_str_eq(msg_to_id, crm_system_name)) {
502 crm_err("Ignored incoming message. Please set_msg_callback on %s", msg_type);
503 } else if (msg_to_id) {
504
505
506 crm_notice("Ignored incoming message %s=%s %s=%s, please set_msg_callback",
507 F_TOID, msg_to_id, F_TYPE, msg_type);
508 } else {
509 crm_debug("Ignored incoming message %s=%s", F_TYPE, msg_type);
510 }
511 ha_msg_del(msg);
512 }
513 }
514
515 if (channel == NULL || channel->ch_status != IPC_CONNECT) {
516 crm_info("Lost connection to heartbeat service.");
517 return FALSE;
518 }
519
520 return TRUE;
521 }
522
523 gboolean
524 register_heartbeat_conn(crm_cluster_t * cluster)
525 {
526 crm_node_t *peer = NULL;
527 const char *const_uuid = NULL;
528 const char *const_uname = NULL;
529
530 crm_debug("Signing in with Heartbeat");
531 if (cluster->hb_conn->llc_ops->signon(cluster->hb_conn, crm_system_name) != HA_OK) {
532 crm_err("Cannot sign on with heartbeat: %s",
533 cluster->hb_conn->llc_ops->errmsg(cluster->hb_conn));
534 return FALSE;
535 }
536
537 if (HA_OK !=
538 cluster->hb_conn->llc_ops->set_msg_callback(cluster->hb_conn, crm_system_name,
539 cluster->hb_dispatch, cluster->hb_conn)) {
540
541 crm_err("Cannot set msg callback: %s", cluster->hb_conn->llc_ops->errmsg(cluster->hb_conn));
542 return FALSE;
543
544 } else {
545 void *handle = NULL;
546 GLLclusterSource *(*g_main_add_cluster) (int priority, ll_cluster_t * api,
547 gboolean can_recurse,
548 gboolean(*dispatch) (ll_cluster_t * source_data,
549 gpointer user_data),
550 gpointer userdata, GDestroyNotify notify) =
551 find_library_function(&handle, HEARTBEAT_LIBRARY, "G_main_add_ll_cluster", 1);
552
553 (*g_main_add_cluster) (G_PRIORITY_HIGH, cluster->hb_conn,
554 FALSE, ha_msg_dispatch, cluster->hb_conn, cluster->destroy);
555 dlclose(handle);
556 }
557
558 const_uname = cluster->hb_conn->llc_ops->get_mynodeid(cluster->hb_conn);
559 CRM_CHECK(const_uname != NULL, return FALSE);
560
561 peer = crm_get_peer(0, const_uname);
562 const_uuid = crm_peer_uuid(peer);
563
564 CRM_CHECK(const_uuid != NULL, return FALSE);
565
566 crm_info("Hostname: %s", const_uname);
567 crm_info("UUID: %s", const_uuid);
568
569 cluster->uname = strdup(const_uname);
570 cluster->uuid = strdup(const_uuid);
571
572 return TRUE;
573 }
574
575 gboolean
576 ccm_have_quorum(oc_ed_t event)
577 {
578 if (event == OC_EV_MS_NEW_MEMBERSHIP || event == OC_EV_MS_PRIMARY_RESTORED) {
579 return TRUE;
580 }
581 return FALSE;
582 }
583
584 const char *
585 ccm_event_name(oc_ed_t event)
586 {
587
588 if (event == OC_EV_MS_NEW_MEMBERSHIP) {
589 return "NEW MEMBERSHIP";
590
591 } else if (event == OC_EV_MS_NOT_PRIMARY) {
592 return "NOT PRIMARY";
593
594 } else if (event == OC_EV_MS_PRIMARY_RESTORED) {
595 return "PRIMARY RESTORED";
596
597 } else if (event == OC_EV_MS_EVICTED) {
598 return "EVICTED";
599
600 } else if (event == OC_EV_MS_INVALID) {
601 return "INVALID";
602 }
603
604 return "NO QUORUM MEMBERSHIP";
605
606 }
607
608 gboolean
609 heartbeat_initialize_nodelist(void *cluster, gboolean force_member, xmlNode * xml_parent)
610 {
611 const char *ha_node = NULL;
612 ll_cluster_t *conn = cluster;
613
614 if (conn == NULL) {
615 crm_debug("Not connected");
616 return FALSE;
617 }
618
619
620 crm_info("Requesting the list of configured nodes");
621 conn->llc_ops->init_nodewalk(conn);
622
623 do {
624 xmlNode *node = NULL;
625 crm_node_t *peer = NULL;
626 const char *ha_node_type = NULL;
627 const char *ha_node_uuid = NULL;
628
629 ha_node = conn->llc_ops->nextnode(conn);
630 if (ha_node == NULL) {
631 continue;
632 }
633
634 ha_node_type = conn->llc_ops->node_type(conn, ha_node);
635 if (safe_str_neq(NORMALNODE, ha_node_type)) {
636 crm_debug("Node %s: skipping '%s'", ha_node, ha_node_type);
637 continue;
638 }
639
640 peer = crm_get_peer(0, ha_node);
641 ha_node_uuid = crm_peer_uuid(peer);
642
643 if (ha_node_uuid == NULL) {
644 crm_warn("Node %s: no uuid found", ha_node);
645 continue;
646 }
647
648 crm_debug("Node: %s (uuid: %s)", ha_node, ha_node_uuid);
649 node = create_xml_node(xml_parent, XML_CIB_TAG_NODE);
650 crm_xml_add(node, XML_ATTR_ID, ha_node_uuid);
651 crm_xml_add(node, XML_ATTR_UNAME, ha_node);
652 crm_xml_add(node, XML_ATTR_TYPE, ha_node_type);
653
654 } while (ha_node != NULL);
655
656 conn->llc_ops->end_nodewalk(conn);
657 return TRUE;
658 }
659
660 #endif