pacemaker 3.0.1-16e74fc4da
Scalable High-Availability cluster resource manager
Loading...
Searching...
No Matches
cpg.c
Go to the documentation of this file.
1/*
2 * Copyright 2004-2025 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
11
12#include <arpa/inet.h>
13#include <inttypes.h> // PRIu32
14#include <netdb.h>
15#include <netinet/in.h>
16#include <stdbool.h>
17#include <stdint.h> // uint32_t
18#include <sys/socket.h>
19#include <sys/types.h> // size_t
20#include <sys/utsname.h>
21
22#include <bzlib.h>
23#include <corosync/corodefs.h>
24#include <corosync/corotypes.h>
25#include <corosync/hdb.h>
26#include <corosync/cpg.h>
27#include <qb/qbipc_common.h>
28#include <qb/qbipcc.h>
29#include <qb/qbutil.h>
30
32#include <crm/common/ipc.h>
33#include <crm/common/ipc_internal.h> // PCMK__SPECIAL_PID
34#include <crm/common/mainloop.h>
35#include <crm/common/xml.h>
36
37#include "crmcluster_private.h"
38
39/* @TODO Once we can update the public API to require pcmk_cluster_t* in more
40 * functions, we can ditch this in favor of cluster->cpg_handle.
41 */
42static cpg_handle_t pcmk_cpg_handle = 0;
43
44// @TODO These could be moved to pcmk_cluster_t* at that time as well
45static bool cpg_evicted = false;
46static GList *cs_message_queue = NULL;
47static int cs_message_timer = 0;
48
49/* @COMPAT Any changes to these structs (other than renames) will break all
50 * rolling upgrades, and should be avoided if possible or done at a major
51 * version bump if not
52 */
53
54struct pcmk__cpg_host_s {
55 uint32_t id;
56 uint32_t pid;
57 gboolean local; // Unused but needed for compatibility
58 enum pcmk_ipc_server type; // For logging only
59 uint32_t size;
60 char uname[MAX_NAME];
61} __attribute__ ((packed));
62
63typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
64
65struct pcmk__cpg_msg_s {
66 struct qb_ipc_response_header header __attribute__ ((aligned(8)));
67 uint32_t id;
68 gboolean is_compressed;
69
72
73 uint32_t size;
74 uint32_t compressed_size;
75 /* 584 bytes */
76 char data[0];
77
78} __attribute__ ((packed));
79
80typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
81
82static void crm_cs_flush(gpointer data);
83
84#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
85
86#define cs_repeat(rc, counter, max, code) do { \
87 rc = code; \
88 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
89 counter++; \
90 crm_debug("Retrying operation after %ds", counter); \
91 sleep(counter); \
92 } else { \
93 break; \
94 } \
95 } while (counter < max)
96
105uint32_t
106pcmk__cpg_local_nodeid(cpg_handle_t handle)
107{
108 cs_error_t rc = CS_OK;
109 int retries = 0;
110 static uint32_t local_nodeid = 0;
111 cpg_handle_t local_handle = handle;
112 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
113 int fd = -1;
114 uid_t found_uid = 0;
115 gid_t found_gid = 0;
116 pid_t found_pid = 0;
117 int rv = 0;
118
119 if (local_nodeid != 0) {
120 return local_nodeid;
121 }
122
123 if (handle == 0) {
124 crm_trace("Creating connection");
125 cs_repeat(rc, retries, 5,
126 cpg_model_initialize(&local_handle, CPG_MODEL_V1,
127 (cpg_model_data_t *) &cpg_model_info,
128 NULL));
129 if (rc != CS_OK) {
130 crm_err("Could not connect to the CPG API: %s (%d)",
131 cs_strerror(rc), rc);
132 return 0;
133 }
134
135 rc = cpg_fd_get(local_handle, &fd);
136 if (rc != CS_OK) {
137 crm_err("Could not obtain the CPG API connection: %s (%d)",
138 cs_strerror(rc), rc);
139 goto bail;
140 }
141
142 // CPG provider run as root (at least in given user namespace)?
143 rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
144 &found_uid, &found_gid);
145 if (rv == 0) {
146 crm_err("CPG provider is not authentic:"
147 " process %lld (uid: %lld, gid: %lld)",
148 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
149 (long long) found_uid, (long long) found_gid);
150 goto bail;
151
152 } else if (rv < 0) {
153 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
154 strerror(-rv), -rv);
155 goto bail;
156 }
157 }
158
159 if (rc == CS_OK) {
160 retries = 0;
161 crm_trace("Performing lookup");
162 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
163 }
164
165 if (rc != CS_OK) {
166 crm_err("Could not get local node id from the CPG API: %s (%d)",
167 pcmk__cs_err_str(rc), rc);
168 }
169
170bail:
171 if (handle == 0) {
172 crm_trace("Closing connection");
173 cpg_finalize(local_handle);
174 }
175 crm_debug("Local nodeid is %u", local_nodeid);
176 return local_nodeid;
177}
178
187static gboolean
188crm_cs_flush_cb(gpointer data)
189{
190 cs_message_timer = 0;
191 crm_cs_flush(data);
192 return FALSE;
193}
194
195// Send no more than this many CPG messages in one flush
196#define CS_SEND_MAX 200
197
204static void
205crm_cs_flush(gpointer data)
206{
207 unsigned int sent = 0;
208 guint queue_len = 0;
209 cs_error_t rc = 0;
210 cpg_handle_t *handle = (cpg_handle_t *) data;
211
212 if (*handle == 0) {
213 crm_trace("Connection is dead");
214 return;
215 }
216
217 queue_len = g_list_length(cs_message_queue);
218 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
219 crm_err("CPG queue has grown to %d", queue_len);
220
221 } else if (queue_len == CS_SEND_MAX) {
222 crm_warn("CPG queue has grown to %d", queue_len);
223 }
224
225 if (cs_message_timer != 0) {
226 /* There is already a timer, wait until it goes off */
227 crm_trace("Timer active %d", cs_message_timer);
228 return;
229 }
230
231 while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
232 struct iovec *iov = cs_message_queue->data;
233
234 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
235 if (rc != CS_OK) {
236 break;
237 }
238
239 sent++;
240 crm_trace("CPG message sent, size=%zu", iov->iov_len);
241
242 cs_message_queue = g_list_remove(cs_message_queue, iov);
243 free(iov->iov_base);
244 free(iov);
245 }
246
247 queue_len -= sent;
248 do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
249 "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
250 sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
251 (int) rc);
252
253 if (cs_message_queue) {
254 uint32_t delay_ms = 100;
255 if (rc != CS_OK) {
256 /* Proportionally more if sending failed but cap at 1s */
257 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
258 }
259 cs_message_timer = pcmk__create_timer(delay_ms, crm_cs_flush_cb, data);
260 }
261}
262
271static int
272pcmk_cpg_dispatch(gpointer user_data)
273{
274 cs_error_t rc = CS_OK;
275 pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
276
277 rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE);
278 if (rc != CS_OK) {
279 crm_err("Connection to the CPG API failed: %s (%d)",
280 pcmk__cs_err_str(rc), rc);
281 cpg_finalize(cluster->priv->cpg_handle);
282 cluster->priv->cpg_handle = 0;
283 return -1;
284
285 } else if (cpg_evicted) {
286 crm_err("Evicted from CPG membership");
287 return -1;
288 }
289 return 0;
290}
291
292static inline const char *
293ais_dest(const pcmk__cpg_host_t *host)
294{
295 return (host->size > 0)? host->uname : "<all>";
296}
297
298static inline const char *
299msg_type2text(enum pcmk_ipc_server type)
300{
301 const char *name = pcmk__server_message_type(type);
302
303 return pcmk__s(name, "unknown");
304}
305
314static bool
315check_message_sanity(const pcmk__cpg_msg_t *msg)
316{
317 int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
318
319 if (payload_size < 1) {
320 crm_err("%sCPG message %d from %s invalid: "
321 "Claimed size of %d bytes is too small "
322 QB_XS " from %s[%u] to %s@%s",
323 (msg->is_compressed? "Compressed " : ""),
324 msg->id, ais_dest(&(msg->sender)),
325 (int) msg->header.size,
326 msg_type2text(msg->sender.type), msg->sender.pid,
327 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
328 return false;
329 }
330
331 if (msg->header.error != CS_OK) {
332 crm_err("%sCPG message %d from %s invalid: "
333 "Sender indicated error %d "
334 QB_XS " from %s[%u] to %s@%s",
335 (msg->is_compressed? "Compressed " : ""),
336 msg->id, ais_dest(&(msg->sender)),
337 msg->header.error,
338 msg_type2text(msg->sender.type), msg->sender.pid,
339 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
340 return false;
341 }
342
343 if (msg_data_len(msg) != payload_size) {
344 crm_err("%sCPG message %d from %s invalid: "
345 "Total size %d inconsistent with payload size %d "
346 QB_XS " from %s[%u] to %s@%s",
347 (msg->is_compressed? "Compressed " : ""),
348 msg->id, ais_dest(&(msg->sender)),
349 (int) msg->header.size, (int) msg_data_len(msg),
350 msg_type2text(msg->sender.type), msg->sender.pid,
351 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
352 return false;
353 }
354
355 if (!msg->is_compressed &&
356 /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
357 * but checking the last byte or two should be quick
358 */
359 (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
360 || (msg->data[msg->size - 1] != '\0'))) {
361 crm_err("CPG message %d from %s invalid: "
362 "Payload does not end at byte %" PRIu32 " "
363 QB_XS " from %s[%u] to %s@%s",
364 msg->id, ais_dest(&(msg->sender)), msg->size,
365 msg_type2text(msg->sender.type), msg->sender.pid,
366 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
367 return false;
368 }
369
370 crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
371 (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
372 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
373 ais_dest(&(msg->sender)),
374 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
375 return true;
376}
377
394char *
395pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
396 void *content, const char **from)
397{
398 char *data = NULL;
399 pcmk__cpg_msg_t *msg = content;
400
401 if (from != NULL) {
402 *from = NULL;
403 }
404
405 if (handle != 0) {
406 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
407 const char *local_name = pcmk__cluster_local_node_name();
408
409 // Update or validate message sender ID
410 if (msg->sender.id == 0) {
411 msg->sender.id = sender_id;
412 } else if (msg->sender.id != sender_id) {
413 crm_warn("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
414 ": claimed ID %" PRIu32,
415 sender_id, pid, msg->sender.id);
416 return NULL;
417 }
418
419 // Ignore messages that aren't for the local node
420 if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
421 crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
422 ": for ID %" PRIu32 " not %" PRIu32,
423 sender_id, pid, msg->host.id, local_nodeid);
424 return NULL;
425 }
426 if ((msg->host.size > 0)
427 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
428
429 crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
430 ": for name %s not %s",
431 sender_id, pid, msg->host.uname, local_name);
432 return NULL;
433 }
434
435 // Add sender name if not in original message
436 if (msg->sender.size == 0) {
437 const pcmk__node_status_t *peer =
438 pcmk__get_node(sender_id, NULL, NULL,
440
441 if (peer->name == NULL) {
442 crm_debug("Received CPG message from node with ID %" PRIu32
443 " but its name is unknown", sender_id);
444 } else {
445 crm_debug("Updating name of CPG message sender with ID %" PRIu32
446 " to %s", sender_id, peer->name);
447 msg->sender.size = strlen(peer->name);
448 memset(msg->sender.uname, 0, MAX_NAME);
449 memcpy(msg->sender.uname, peer->name, msg->sender.size);
450 }
451 }
452 }
453
454 // Ensure sender is in peer cache (though it should already be)
455 pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
457
458 if (from != NULL) {
459 *from = msg->sender.uname;
460 }
461
462 if (!check_message_sanity(msg)) {
463 return NULL;
464 }
465
466 if (msg->is_compressed && (msg->size > 0)) {
467 int rc = BZ_OK;
468 unsigned int new_size = msg->size + 1;
469 char *uncompressed = pcmk__assert_alloc(1, new_size);
470
471 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
472 msg->compressed_size, 1, 0);
473 rc = pcmk__bzlib2rc(rc);
474 if ((rc == pcmk_rc_ok) && (msg->size != new_size)) { // libbz2 bug?
476 }
477 if (rc != pcmk_rc_ok) {
478 free(uncompressed);
479 crm_warn("Ignoring compressed CPG message %d from %s (ID %" PRIu32
480 " PID %" PRIu32 "): %s",
481 msg->id, ais_dest(&(msg->sender)), sender_id, pid,
482 pcmk_rc_str(rc));
483 return NULL;
484 }
485 data = uncompressed;
486
487 } else {
488 data = pcmk__str_copy(msg->data);
489 }
490
491 crm_trace("Received %sCPG message %d from %s (ID %" PRIu32
492 " PID %" PRIu32 "): %.40s...",
493 (msg->is_compressed? "compressed " : ""),
494 msg->id, ais_dest(&(msg->sender)), sender_id, pid, msg->data);
495 return data;
496}
497
509static int
510cmp_member_list_nodeid(const void *first, const void *second)
511{
512 const struct cpg_address *const a = *((const struct cpg_address **) first),
513 *const b = *((const struct cpg_address **) second);
514 if (a->nodeid < b->nodeid) {
515 return -1;
516 } else if (a->nodeid > b->nodeid) {
517 return 1;
518 }
519 /* don't bother with "reason" nor "pid" */
520 return 0;
521}
522
531static const char *
532cpgreason2str(cpg_reason_t reason)
533{
534 switch (reason) {
535 case CPG_REASON_JOIN: return " via cpg_join";
536 case CPG_REASON_LEAVE: return " via cpg_leave";
537 case CPG_REASON_NODEDOWN: return " via cluster exit";
538 case CPG_REASON_NODEUP: return " via cluster join";
539 case CPG_REASON_PROCDOWN: return " for unknown reason";
540 default: break;
541 }
542 return "";
543}
544
553static inline const char *
554peer_name(const pcmk__node_status_t *peer)
555{
556 return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node";
557}
558
570static void
571node_left(const char *cpg_group_name, int event_counter,
572 uint32_t local_nodeid, const struct cpg_address *cpg_peer,
573 const struct cpg_address **sorted_member_list,
574 size_t member_list_entries)
575{
576 pcmk__node_status_t *peer =
577 pcmk__search_node_caches(cpg_peer->nodeid, NULL, NULL,
579 const struct cpg_address **rival = NULL;
580
581 /* Most CPG-related Pacemaker code assumes that only one process on a node
582 * can be in the process group, but Corosync does not impose this
583 * limitation, and more than one can be a member in practice due to a
584 * daemon attempting to start while another instance is already running.
585 *
586 * Check for any such duplicate instances, because we don't want to process
587 * their leaving as if our actual peer left. If the peer that left still has
588 * an entry in sorted_member_list (with a different PID), we will ignore the
589 * leaving.
590 *
591 * @TODO Track CPG members' PIDs so we can tell exactly who left.
592 */
593 if (peer != NULL) {
594 rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
595 sizeof(const struct cpg_address *),
596 cmp_member_list_nodeid);
597 }
598
599 if (rival == NULL) {
600 crm_info("Group %s event %d: %s (node %u pid %u) left%s",
601 cpg_group_name, event_counter, peer_name(peer),
602 cpg_peer->nodeid, cpg_peer->pid,
603 cpgreason2str(cpg_peer->reason));
604 if (peer != NULL) {
605 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
607 }
608 } else if (cpg_peer->nodeid == local_nodeid) {
609 crm_warn("Group %s event %d: duplicate local pid %u left%s",
610 cpg_group_name, event_counter,
611 cpg_peer->pid, cpgreason2str(cpg_peer->reason));
612 } else {
613 crm_warn("Group %s event %d: "
614 "%s (node %u) duplicate pid %u left%s (%u remains)",
615 cpg_group_name, event_counter, peer_name(peer),
616 cpg_peer->nodeid, cpg_peer->pid,
617 cpgreason2str(cpg_peer->reason), (*rival)->pid);
618 }
619}
620
637void
638pcmk__cpg_confchg_cb(cpg_handle_t handle,
639 const struct cpg_name *group_name,
640 const struct cpg_address *member_list,
641 size_t member_list_entries,
642 const struct cpg_address *left_list,
643 size_t left_list_entries,
644 const struct cpg_address *joined_list,
645 size_t joined_list_entries)
646{
647 static int counter = 0;
648
649 bool found = false;
650 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
651 const struct cpg_address **sorted = NULL;
652
653 sorted = pcmk__assert_alloc(member_list_entries,
654 sizeof(const struct cpg_address *));
655
656 for (size_t iter = 0; iter < member_list_entries; iter++) {
657 sorted[iter] = member_list + iter;
658 }
659
660 // So that the cross-matching of multiply-subscribed nodes is then cheap
661 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
662 cmp_member_list_nodeid);
663
664 for (int i = 0; i < left_list_entries; i++) {
665 node_left(group_name->value, counter, local_nodeid, &left_list[i],
666 sorted, member_list_entries);
667 }
668 free(sorted);
669 sorted = NULL;
670
671 for (int i = 0; i < joined_list_entries; i++) {
672 crm_info("Group %s event %d: node %u pid %u joined%s",
673 group_name->value, counter, joined_list[i].nodeid,
674 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
675 }
676
677 for (int i = 0; i < member_list_entries; i++) {
678 pcmk__node_status_t *peer =
679 pcmk__get_node(member_list[i].nodeid, NULL, NULL,
681
682 if (member_list[i].nodeid == local_nodeid
683 && member_list[i].pid != getpid()) {
684 // See the note in node_left()
685 crm_warn("Group %s event %d: detected duplicate local pid %u",
686 group_name->value, counter, member_list[i].pid);
687 continue;
688 }
689 crm_info("Group %s event %d: %s (node %u pid %u) is member",
690 group_name->value, counter, peer_name(peer),
691 member_list[i].nodeid, member_list[i].pid);
692
693 /* If the caller left auto-reaping enabled, this will also update the
694 * state to member.
695 */
696 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
698
699 if (peer && peer->state && strcmp(peer->state, PCMK_VALUE_MEMBER)) {
700 /* The node is a CPG member, but we currently think it's not a
701 * cluster member. This is possible only if auto-reaping was
702 * disabled. The node may be joining, and we happened to get the CPG
703 * notification before the quorum notification; or the node may have
704 * just died, and we are processing its final messages; or a bug
705 * has affected the peer cache.
706 */
707 time_t now = time(NULL);
708
709 if (peer->when_lost == 0) {
710 // Track when we first got into this contradictory state
711 peer->when_lost = now;
712
713 } else if (now > (peer->when_lost + 60)) {
714 // If it persists for more than a minute, update the state
715 crm_warn("Node %u is member of group %s but was believed "
716 "offline",
717 member_list[i].nodeid, group_name->value);
718 pcmk__update_peer_state(__func__, peer, PCMK_VALUE_MEMBER, 0);
719 }
720 }
721
722 if (local_nodeid == member_list[i].nodeid) {
723 found = true;
724 }
725 }
726
727 if (!found) {
728 crm_err("Local node was evicted from group %s", group_name->value);
729 cpg_evicted = true;
730 }
731
732 counter++;
733}
734
743int
744pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
745{
746 if (cluster == NULL) {
747 return EINVAL;
748 }
749 cluster->cpg.cpg_deliver_fn = fn;
750 return pcmk_rc_ok;
751}
752
761int
762pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
763{
764 if (cluster == NULL) {
765 return EINVAL;
766 }
767 cluster->cpg.cpg_confchg_fn = fn;
768 return pcmk_rc_ok;
769}
770
778int
780{
781 cs_error_t rc;
782 int fd = -1;
783 int retries = 0;
784 uint32_t id = 0;
785 pcmk__node_status_t *peer = NULL;
786 cpg_handle_t handle = 0;
787 const char *cpg_group_name = NULL;
788 uid_t found_uid = 0;
789 gid_t found_gid = 0;
790 pid_t found_pid = 0;
791 int rv;
792
793 struct mainloop_fd_callbacks cpg_fd_callbacks = {
794 .dispatch = pcmk_cpg_dispatch,
795 .destroy = cluster->destroy,
796 };
797
798 cpg_model_v1_data_t cpg_model_info = {
799 .model = CPG_MODEL_V1,
800 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
801 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
802 .cpg_totem_confchg_fn = NULL,
803 .flags = 0,
804 };
805
806 cpg_evicted = false;
807
808 if (cluster->priv->server != pcmk_ipc_unknown) {
809 cpg_group_name = pcmk__server_message_type(cluster->priv->server);
810 }
811
812 if (cpg_group_name == NULL) {
813 /* The name will already be non-NULL for Pacemaker servers. If a
814 * command-line tool or external caller connects to the cluster,
815 * they will join this CPG group.
816 */
817 cpg_group_name = pcmk__s(crm_system_name, "unknown");
818 }
819 memset(cluster->priv->group.value, 0, 128);
820 strncpy(cluster->priv->group.value, cpg_group_name, 127);
821 cluster->priv->group.length = strlen(cluster->priv->group.value) + 1;
822
823 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
824 if (rc != CS_OK) {
825 crm_err("Could not connect to the CPG API: %s (%d)",
826 cs_strerror(rc), rc);
827 goto bail;
828 }
829
830 rc = cpg_fd_get(handle, &fd);
831 if (rc != CS_OK) {
832 crm_err("Could not obtain the CPG API connection: %s (%d)",
833 cs_strerror(rc), rc);
834 goto bail;
835 }
836
837 /* CPG provider run as root (in given user namespace, anyway)? */
838 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
839 &found_uid, &found_gid))) {
840 crm_err("CPG provider is not authentic:"
841 " process %lld (uid: %lld, gid: %lld)",
842 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
843 (long long) found_uid, (long long) found_gid);
844 rc = CS_ERR_ACCESS;
845 goto bail;
846 } else if (rv < 0) {
847 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
848 strerror(-rv), -rv);
849 rc = CS_ERR_ACCESS;
850 goto bail;
851 }
852
853 id = pcmk__cpg_local_nodeid(handle);
854 if (id == 0) {
855 crm_err("Could not get local node id from the CPG API");
856 goto bail;
857
858 }
859 cluster->priv->node_id = id;
860
861 retries = 0;
862 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group));
863 if (rc != CS_OK) {
864 crm_err("Could not join the CPG group '%s': %d", cpg_group_name, rc);
865 goto bail;
866 }
867
868 pcmk_cpg_handle = handle;
869 cluster->priv->cpg_handle = handle;
870 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
871
872 bail:
873 if (rc != CS_OK) {
874 cpg_finalize(handle);
875 // @TODO Map rc to more specific Pacemaker return code
876 return ENOTCONN;
877 }
878
879 peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
881 return pcmk_rc_ok;
882}
883
890void
892{
893 pcmk_cpg_handle = 0;
894 if (cluster->priv->cpg_handle != 0) {
895 crm_trace("Disconnecting CPG");
896 cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group);
897 cpg_finalize(cluster->priv->cpg_handle);
898 cluster->priv->cpg_handle = 0;
899
900 } else {
901 crm_info("No CPG connection");
902 }
903}
904
915static bool
916send_cpg_text(const char *data, const pcmk__node_status_t *node,
917 enum pcmk_ipc_server dest)
918{
919 static int msg_id = 0;
920 static int local_pid = 0;
921 static int local_name_len = 0;
922 static const char *local_name = NULL;
923
924 char *target = NULL;
925 struct iovec *iov;
926 pcmk__cpg_msg_t *msg = NULL;
927
928 if (local_name == NULL) {
929 local_name = pcmk__cluster_local_node_name();
930 }
931 if ((local_name_len == 0) && (local_name != NULL)) {
932 local_name_len = strlen(local_name);
933 }
934
935 if (data == NULL) {
936 data = "";
937 }
938
939 if (local_pid == 0) {
940 local_pid = getpid();
941 }
942
943 msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
944
945 msg_id++;
946 msg->id = msg_id;
947 msg->header.error = CS_OK;
948
949 msg->host.type = dest;
950
951 if (node != NULL) {
952 if (node->name != NULL) {
953 target = pcmk__str_copy(node->name);
954 msg->host.size = strlen(node->name);
955 memset(msg->host.uname, 0, MAX_NAME);
956 memcpy(msg->host.uname, node->name, msg->host.size);
957
958 } else {
959 target = crm_strdup_printf("%" PRIu32, node->cluster_layer_id);
960 }
961 msg->host.id = node->cluster_layer_id;
962
963 } else {
964 target = pcmk__str_copy("all");
965 }
966
967 msg->sender.id = 0;
968 msg->sender.type = pcmk__parse_server(crm_system_name);
969 msg->sender.pid = local_pid;
970 msg->sender.size = local_name_len;
971 memset(msg->sender.uname, 0, MAX_NAME);
972
973 if ((local_name != NULL) && (msg->sender.size != 0)) {
974 memcpy(msg->sender.uname, local_name, msg->sender.size);
975 }
976
977 msg->size = 1 + strlen(data);
978 msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
979
980 if (msg->size < CRM_BZ2_THRESHOLD) {
981 msg = pcmk__realloc(msg, msg->header.size);
982 memcpy(msg->data, data, msg->size);
983
984 } else {
985 char *compressed = NULL;
986 unsigned int new_size = 0;
987
988 if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
989 &new_size) == pcmk_rc_ok) {
990
991 msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
992 msg = pcmk__realloc(msg, msg->header.size);
993 memcpy(msg->data, compressed, new_size);
994
995 msg->is_compressed = TRUE;
996 msg->compressed_size = new_size;
997
998 } else {
999 msg = pcmk__realloc(msg, msg->header.size);
1000 memcpy(msg->data, data, msg->size);
1001 }
1002
1003 free(compressed);
1004 }
1005
1006 iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1007 iov->iov_base = msg;
1008 iov->iov_len = msg->header.size;
1009
1010 if (msg->compressed_size > 0) {
1011 crm_trace("Queueing CPG message %" PRIu32 " to %s "
1012 "(%zu bytes, %" PRIu32 " bytes compressed payload): %.200s",
1013 msg->id, target, iov->iov_len, msg->compressed_size, data);
1014 } else {
1015 crm_trace("Queueing CPG message %" PRIu32 " to %s "
1016 "(%zu bytes, %" PRIu32 " bytes payload): %.200s",
1017 msg->id, target, iov->iov_len, msg->size, data);
1018 }
1019
1020 free(target);
1021
1022 cs_message_queue = g_list_append(cs_message_queue, iov);
1023 crm_cs_flush(&pcmk_cpg_handle);
1024
1025 return true;
1026}
1027
1038bool
1039pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node,
1040 enum pcmk_ipc_server dest)
1041{
1042 bool rc = true;
1043 GString *data = g_string_sized_new(1024);
1044
1045 pcmk__xml_string(msg, 0, data, 0);
1046
1047 rc = send_cpg_text(data->str, node, dest);
1048 g_string_free(data, TRUE);
1049 return rc;
1050}
const char * name
Definition cib.c:26
@ pcmk__node_search_cluster_member
Search for cluster nodes from membership cache.
Definition internal.h:63
pcmk__node_status_t * pcmk__update_peer_state(const char *source, pcmk__node_status_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
@ crm_proc_cpg
Definition internal.h:35
const char * pcmk__cluster_local_node_name(void)
Definition cluster.c:293
pcmk__node_status_t * pcmk__get_node(unsigned int id, const char *uname, const char *xml_id, uint32_t flags)
Definition membership.c:947
pcmk__node_status_t * crm_update_peer_proc(const char *source, pcmk__node_status_t *peer, uint32_t flag, const char *status)
pcmk__node_status_t * pcmk__search_node_caches(unsigned int id, const char *uname, const char *xml_id, uint32_t flags)
Definition membership.c:811
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
Definition utils.c:405
#define pcmk__assert_alloc(nmemb, size)
Definition internal.h:246
struct tcp_async_cb_data __attribute__
uint32_t compressed_size
Definition cpg.c:8
void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition cpg.c:638
char * pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, const char **from)
Definition cpg.c:395
pcmk__cpg_host_t host
Definition cpg.c:4
bool pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node, enum pcmk_ipc_server dest)
Definition cpg.c:1039
pcmk__cpg_host_t sender
Definition cpg.c:5
int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
Set the CPG deliver callback function for a cluster object.
Definition cpg.c:744
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
Definition cpg.c:80
int pcmk__cpg_connect(pcmk_cluster_t *cluster)
Connect to Corosync CPG.
Definition cpg.c:779
int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
Set the CPG config change callback function for a cluster object.
Definition cpg.c:762
#define CS_SEND_MAX
Definition cpg.c:196
#define msg_data_len(msg)
Definition cpg.c:84
char data[0]
Definition cpg.c:10
uint32_t size
Definition cpg.c:4
uint32_t id
Definition cpg.c:0
gboolean is_compressed
Definition cpg.c:2
void pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
Definition cpg.c:891
uint32_t pcmk__cpg_local_nodeid(cpg_handle_t handle)
Definition cpg.c:106
#define cs_repeat(rc, counter, max, code)
Definition cpg.c:86
uint32_t pid
Definition cpg.c:1
enum pcmk_ipc_server type
Definition cpg.c:3
struct pcmk__cpg_host_s pcmk__cpg_host_t
Definition cpg.c:63
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
Definition crm.h:73
char * crm_system_name
Definition utils.c:45
IPC interface to Pacemaker daemons.
pcmk_ipc_server
Available IPC interfaces.
Definition ipc.h:48
@ pcmk_ipc_unknown
Unknown or invalid.
Definition ipc.h:49
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
#define PCMK__SPECIAL_PID_AS_0(p)
#define crm_info(fmt, args...)
Definition logging.h:365
#define do_crm_log(level, fmt, args...)
Log a message.
Definition logging.h:149
#define crm_warn(fmt, args...)
Definition logging.h:360
#define crm_debug(fmt, args...)
Definition logging.h:368
#define crm_err(fmt, args...)
Definition logging.h:357
#define crm_trace(fmt, args...)
Definition logging.h:370
#define LOG_TRACE
Definition logging.h:38
Wrappers for and extensions to glib mainloop.
#define G_PRIORITY_MEDIUM
Definition mainloop.h:195
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition mainloop.c:962
#define PCMK_VALUE_OFFLINE
Definition options.h:186
#define PCMK_VALUE_MEMBER
Definition options.h:171
#define PCMK_VALUE_ONLINE
Definition options.h:187
const char * target
Definition pcmk_fence.c:31
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition results.c:617
@ pcmk_rc_compression
Definition results.h:115
@ pcmk_rc_ok
Definition results.h:159
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition results.c:1028
enum pcmk_ipc_server pcmk__parse_server(const char *text)
Definition servers.c:178
const char * pcmk__server_message_type(enum pcmk_ipc_server server)
Definition servers.c:162
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define pcmk__plural_s(i)
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition strings.c:839
@ pcmk__str_casei
#define pcmk__str_copy(str)
uint32_t node_id
Local node ID at cluster layer.
Definition internal.h:98
enum pcmk_ipc_server server
Server this connection is for (if any)
Definition internal.h:90
pcmk__cluster_private_t * priv
Definition cluster.h:36
void(* destroy)(gpointer)
Definition cluster.h:40
Node status data (may be a cluster node or a Pacemaker Remote node)
Definition internal.h:111
uint32_t cluster_layer_id
Cluster-layer numeric node ID.
Definition internal.h:165
char * name
Node name as known to cluster layer, or Pacemaker Remote node name.
Definition internal.h:113
time_t when_lost
When CPG membership was last lost.
Definition internal.h:166
Wrappers for and extensions to libxml2.
#define CRM_BZ2_THRESHOLD
Definition xml_io.h:35
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition xml_io.c:370