pacemaker  3.0.0-d8340737c4
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2024 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 
12 #include <arpa/inet.h>
13 #include <inttypes.h> // PRIu32
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdbool.h>
17 #include <stdint.h> // uint32_t
18 #include <sys/socket.h>
19 #include <sys/types.h> // size_t
20 #include <sys/utsname.h>
21 
22 #include <bzlib.h>
23 #include <corosync/corodefs.h>
24 #include <corosync/corotypes.h>
25 #include <corosync/hdb.h>
26 #include <corosync/cpg.h>
27 #include <qb/qbipc_common.h>
28 #include <qb/qbipcc.h>
29 #include <qb/qbutil.h>
30 
31 #include <crm/cluster/internal.h>
32 #include <crm/common/ipc.h>
33 #include <crm/common/ipc_internal.h> // PCMK__SPECIAL_PID
34 #include <crm/common/mainloop.h>
35 #include <crm/common/xml.h>
36 
37 #include "crmcluster_private.h"
38 
39 /* @TODO Once we can update the public API to require pcmk_cluster_t* in more
40  * functions, we can ditch this in favor of cluster->cpg_handle.
41  */
42 static cpg_handle_t pcmk_cpg_handle = 0;
43 
44 // @TODO These could be moved to pcmk_cluster_t* at that time as well
45 static bool cpg_evicted = false;
46 static GList *cs_message_queue = NULL;
47 static int cs_message_timer = 0;
48 
49 /* @COMPAT Any changes to these structs (other than renames) will break all
50  * rolling upgrades, and should be avoided if possible or done at a major
51  * version bump if not
52  */
53 
54 struct pcmk__cpg_host_s {
55  uint32_t id;
56  uint32_t pid;
57  gboolean local; // Unused but needed for compatibility
58  enum pcmk_ipc_server type; // For logging only
59  uint32_t size;
60  char uname[MAX_NAME];
61 } __attribute__ ((packed));
62 
63 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
64 
65 struct pcmk__cpg_msg_s {
66  struct qb_ipc_response_header header __attribute__ ((aligned(8)));
67  uint32_t id;
68  gboolean is_compressed;
69 
72 
73  uint32_t size;
74  uint32_t compressed_size;
75  /* 584 bytes */
76  char data[0];
77 
78 } __attribute__ ((packed));
79 
80 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
81 
82 static void crm_cs_flush(gpointer data);
83 
84 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
85 
86 #define cs_repeat(rc, counter, max, code) do { \
87  rc = code; \
88  if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
89  counter++; \
90  crm_debug("Retrying operation after %ds", counter); \
91  sleep(counter); \
92  } else { \
93  break; \
94  } \
95  } while (counter < max)
96 
105 uint32_t
106 pcmk__cpg_local_nodeid(cpg_handle_t handle)
107 {
108  cs_error_t rc = CS_OK;
109  int retries = 0;
110  static uint32_t local_nodeid = 0;
111  cpg_handle_t local_handle = handle;
112  cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
113  int fd = -1;
114  uid_t found_uid = 0;
115  gid_t found_gid = 0;
116  pid_t found_pid = 0;
117  int rv = 0;
118 
119  if (local_nodeid != 0) {
120  return local_nodeid;
121  }
122 
123  if (handle == 0) {
124  crm_trace("Creating connection");
125  cs_repeat(rc, retries, 5,
126  cpg_model_initialize(&local_handle, CPG_MODEL_V1,
127  (cpg_model_data_t *) &cpg_model_info,
128  NULL));
129  if (rc != CS_OK) {
130  crm_err("Could not connect to the CPG API: %s (%d)",
131  cs_strerror(rc), rc);
132  return 0;
133  }
134 
135  rc = cpg_fd_get(local_handle, &fd);
136  if (rc != CS_OK) {
137  crm_err("Could not obtain the CPG API connection: %s (%d)",
138  cs_strerror(rc), rc);
139  goto bail;
140  }
141 
142  // CPG provider run as root (at least in given user namespace)?
143  rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
144  &found_uid, &found_gid);
145  if (rv == 0) {
146  crm_err("CPG provider is not authentic:"
147  " process %lld (uid: %lld, gid: %lld)",
148  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
149  (long long) found_uid, (long long) found_gid);
150  goto bail;
151 
152  } else if (rv < 0) {
153  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
154  strerror(-rv), -rv);
155  goto bail;
156  }
157  }
158 
159  if (rc == CS_OK) {
160  retries = 0;
161  crm_trace("Performing lookup");
162  cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
163  }
164 
165  if (rc != CS_OK) {
166  crm_err("Could not get local node id from the CPG API: %s (%d)",
167  pcmk__cs_err_str(rc), rc);
168  }
169 
170 bail:
171  if (handle == 0) {
172  crm_trace("Closing connection");
173  cpg_finalize(local_handle);
174  }
175  crm_debug("Local nodeid is %u", local_nodeid);
176  return local_nodeid;
177 }
178 
187 static gboolean
188 crm_cs_flush_cb(gpointer data)
189 {
190  cs_message_timer = 0;
191  crm_cs_flush(data);
192  return FALSE;
193 }
194 
195 // Send no more than this many CPG messages in one flush
196 #define CS_SEND_MAX 200
197 
204 static void
205 crm_cs_flush(gpointer data)
206 {
207  unsigned int sent = 0;
208  guint queue_len = 0;
209  cs_error_t rc = 0;
210  cpg_handle_t *handle = (cpg_handle_t *) data;
211 
212  if (*handle == 0) {
213  crm_trace("Connection is dead");
214  return;
215  }
216 
217  queue_len = g_list_length(cs_message_queue);
218  if (((queue_len % 1000) == 0) && (queue_len > 1)) {
219  crm_err("CPG queue has grown to %d", queue_len);
220 
221  } else if (queue_len == CS_SEND_MAX) {
222  crm_warn("CPG queue has grown to %d", queue_len);
223  }
224 
225  if (cs_message_timer != 0) {
226  /* There is already a timer, wait until it goes off */
227  crm_trace("Timer active %d", cs_message_timer);
228  return;
229  }
230 
231  while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
232  struct iovec *iov = cs_message_queue->data;
233 
234  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
235  if (rc != CS_OK) {
236  break;
237  }
238 
239  sent++;
240  crm_trace("CPG message sent, size=%llu",
241  (unsigned long long) iov->iov_len);
242 
243  cs_message_queue = g_list_remove(cs_message_queue, iov);
244  free(iov->iov_base);
245  free(iov);
246  }
247 
248  queue_len -= sent;
249  do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
250  "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
251  sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
252  (int) rc);
253 
254  if (cs_message_queue) {
255  uint32_t delay_ms = 100;
256  if (rc != CS_OK) {
257  /* Proportionally more if sending failed but cap at 1s */
258  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
259  }
260  cs_message_timer = pcmk__create_timer(delay_ms, crm_cs_flush_cb, data);
261  }
262 }
263 
272 static int
273 pcmk_cpg_dispatch(gpointer user_data)
274 {
275  cs_error_t rc = CS_OK;
276  pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
277 
278  rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE);
279  if (rc != CS_OK) {
280  crm_err("Connection to the CPG API failed: %s (%d)",
281  pcmk__cs_err_str(rc), rc);
282  cpg_finalize(cluster->priv->cpg_handle);
283  cluster->priv->cpg_handle = 0;
284  return -1;
285 
286  } else if (cpg_evicted) {
287  crm_err("Evicted from CPG membership");
288  return -1;
289  }
290  return 0;
291 }
292 
293 static inline const char *
294 ais_dest(const pcmk__cpg_host_t *host)
295 {
296  return (host->size > 0)? host->uname : "<all>";
297 }
298 
299 static inline const char *
300 msg_type2text(enum pcmk_ipc_server type)
301 {
302  const char *name = pcmk__server_message_type(type);
303 
304  return pcmk__s(name, "unknown");
305 }
306 
315 static bool
316 check_message_sanity(const pcmk__cpg_msg_t *msg)
317 {
318  int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
319 
320  if (payload_size < 1) {
321  crm_err("%sCPG message %d from %s invalid: "
322  "Claimed size of %d bytes is too small "
323  QB_XS " from %s[%u] to %s@%s",
324  (msg->is_compressed? "Compressed " : ""),
325  msg->id, ais_dest(&(msg->sender)),
326  (int) msg->header.size,
327  msg_type2text(msg->sender.type), msg->sender.pid,
328  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
329  return false;
330  }
331 
332  if (msg->header.error != CS_OK) {
333  crm_err("%sCPG message %d from %s invalid: "
334  "Sender indicated error %d "
335  QB_XS " from %s[%u] to %s@%s",
336  (msg->is_compressed? "Compressed " : ""),
337  msg->id, ais_dest(&(msg->sender)),
338  msg->header.error,
339  msg_type2text(msg->sender.type), msg->sender.pid,
340  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
341  return false;
342  }
343 
344  if (msg_data_len(msg) != payload_size) {
345  crm_err("%sCPG message %d from %s invalid: "
346  "Total size %d inconsistent with payload size %d "
347  QB_XS " from %s[%u] to %s@%s",
348  (msg->is_compressed? "Compressed " : ""),
349  msg->id, ais_dest(&(msg->sender)),
350  (int) msg->header.size, (int) msg_data_len(msg),
351  msg_type2text(msg->sender.type), msg->sender.pid,
352  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
353  return false;
354  }
355 
356  if (!msg->is_compressed &&
357  /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
358  * but checking the last byte or two should be quick
359  */
360  (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
361  || (msg->data[msg->size - 1] != '\0'))) {
362  crm_err("CPG message %d from %s invalid: "
363  "Payload does not end at byte %llu "
364  QB_XS " from %s[%u] to %s@%s",
365  msg->id, ais_dest(&(msg->sender)),
366  (unsigned long long) msg->size,
367  msg_type2text(msg->sender.type), msg->sender.pid,
368  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
369  return false;
370  }
371 
372  crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
373  (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
374  msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
375  ais_dest(&(msg->sender)),
376  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
377  return true;
378 }
379 
396 char *
397 pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
398  void *content, const char **from)
399 {
400  char *data = NULL;
401  pcmk__cpg_msg_t *msg = content;
402 
403  if (from != NULL) {
404  *from = NULL;
405  }
406 
407  if (handle != 0) {
408  uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
409  const char *local_name = pcmk__cluster_local_node_name();
410 
411  // Update or validate message sender ID
412  if (msg->sender.id == 0) {
413  msg->sender.id = sender_id;
414  } else if (msg->sender.id != sender_id) {
415  crm_warn("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
416  ": claimed ID %" PRIu32,
417  sender_id, pid, msg->sender.id);
418  return NULL;
419  }
420 
421  // Ignore messages that aren't for the local node
422  if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
423  crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
424  ": for ID %" PRIu32 " not %" PRIu32,
425  sender_id, pid, msg->host.id, local_nodeid);
426  return NULL;
427  }
428  if ((msg->host.size > 0)
429  && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
430 
431  crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
432  ": for name %s not %s",
433  sender_id, pid, msg->host.uname, local_name);
434  return NULL;
435  }
436 
437  // Add sender name if not in original message
438  if (msg->sender.size == 0) {
439  const pcmk__node_status_t *peer =
440  pcmk__get_node(sender_id, NULL, NULL,
442 
443  if (peer->name == NULL) {
444  crm_debug("Received CPG message from node with ID %" PRIu32
445  " but its name is unknown", sender_id);
446  } else {
447  crm_debug("Updating name of CPG message sender with ID %" PRIu32
448  " to %s", sender_id, peer->name);
449  msg->sender.size = strlen(peer->name);
450  memset(msg->sender.uname, 0, MAX_NAME);
451  memcpy(msg->sender.uname, peer->name, msg->sender.size);
452  }
453  }
454  }
455 
456  // Ensure sender is in peer cache (though it should already be)
457  pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
459 
460  if (from != NULL) {
461  *from = msg->sender.uname;
462  }
463 
464  if (!check_message_sanity(msg)) {
465  return NULL;
466  }
467 
468  if (msg->is_compressed && (msg->size > 0)) {
469  int rc = BZ_OK;
470  unsigned int new_size = msg->size + 1;
471  char *uncompressed = pcmk__assert_alloc(1, new_size);
472 
473  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
474  msg->compressed_size, 1, 0);
475  rc = pcmk__bzlib2rc(rc);
476  if ((rc == pcmk_rc_ok) && (msg->size != new_size)) { // libbz2 bug?
477  rc = pcmk_rc_compression;
478  }
479  if (rc != pcmk_rc_ok) {
480  free(uncompressed);
481  crm_warn("Ignoring compressed CPG message %d from %s (ID %" PRIu32
482  " PID %" PRIu32 "): %s",
483  msg->id, ais_dest(&(msg->sender)), sender_id, pid,
484  pcmk_rc_str(rc));
485  return NULL;
486  }
487  data = uncompressed;
488 
489  } else {
490  data = pcmk__str_copy(msg->data);
491  }
492 
493  crm_trace("Received %sCPG message %d from %s (ID %" PRIu32
494  " PID %" PRIu32 "): %.40s...",
495  (msg->is_compressed? "compressed " : ""),
496  msg->id, ais_dest(&(msg->sender)), sender_id, pid, msg->data);
497  return data;
498 }
499 
511 static int
512 cmp_member_list_nodeid(const void *first, const void *second)
513 {
514  const struct cpg_address *const a = *((const struct cpg_address **) first),
515  *const b = *((const struct cpg_address **) second);
516  if (a->nodeid < b->nodeid) {
517  return -1;
518  } else if (a->nodeid > b->nodeid) {
519  return 1;
520  }
521  /* don't bother with "reason" nor "pid" */
522  return 0;
523 }
524 
533 static const char *
534 cpgreason2str(cpg_reason_t reason)
535 {
536  switch (reason) {
537  case CPG_REASON_JOIN: return " via cpg_join";
538  case CPG_REASON_LEAVE: return " via cpg_leave";
539  case CPG_REASON_NODEDOWN: return " via cluster exit";
540  case CPG_REASON_NODEUP: return " via cluster join";
541  case CPG_REASON_PROCDOWN: return " for unknown reason";
542  default: break;
543  }
544  return "";
545 }
546 
555 static inline const char *
556 peer_name(const pcmk__node_status_t *peer)
557 {
558  return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node";
559 }
560 
572 static void
573 node_left(const char *cpg_group_name, int event_counter,
574  uint32_t local_nodeid, const struct cpg_address *cpg_peer,
575  const struct cpg_address **sorted_member_list,
576  size_t member_list_entries)
577 {
578  pcmk__node_status_t *peer =
579  pcmk__search_node_caches(cpg_peer->nodeid, NULL,
581  const struct cpg_address **rival = NULL;
582 
583  /* Most CPG-related Pacemaker code assumes that only one process on a node
584  * can be in the process group, but Corosync does not impose this
585  * limitation, and more than one can be a member in practice due to a
586  * daemon attempting to start while another instance is already running.
587  *
588  * Check for any such duplicate instances, because we don't want to process
589  * their leaving as if our actual peer left. If the peer that left still has
590  * an entry in sorted_member_list (with a different PID), we will ignore the
591  * leaving.
592  *
593  * @TODO Track CPG members' PIDs so we can tell exactly who left.
594  */
595  if (peer != NULL) {
596  rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
597  sizeof(const struct cpg_address *),
598  cmp_member_list_nodeid);
599  }
600 
601  if (rival == NULL) {
602  crm_info("Group %s event %d: %s (node %u pid %u) left%s",
603  cpg_group_name, event_counter, peer_name(peer),
604  cpg_peer->nodeid, cpg_peer->pid,
605  cpgreason2str(cpg_peer->reason));
606  if (peer != NULL) {
607  crm_update_peer_proc(__func__, peer, crm_proc_cpg,
609  }
610  } else if (cpg_peer->nodeid == local_nodeid) {
611  crm_warn("Group %s event %d: duplicate local pid %u left%s",
612  cpg_group_name, event_counter,
613  cpg_peer->pid, cpgreason2str(cpg_peer->reason));
614  } else {
615  crm_warn("Group %s event %d: "
616  "%s (node %u) duplicate pid %u left%s (%u remains)",
617  cpg_group_name, event_counter, peer_name(peer),
618  cpg_peer->nodeid, cpg_peer->pid,
619  cpgreason2str(cpg_peer->reason), (*rival)->pid);
620  }
621 }
622 
639 void
640 pcmk__cpg_confchg_cb(cpg_handle_t handle,
641  const struct cpg_name *group_name,
642  const struct cpg_address *member_list,
643  size_t member_list_entries,
644  const struct cpg_address *left_list,
645  size_t left_list_entries,
646  const struct cpg_address *joined_list,
647  size_t joined_list_entries)
648 {
649  static int counter = 0;
650 
651  bool found = false;
652  uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
653  const struct cpg_address **sorted = NULL;
654 
655  sorted = pcmk__assert_alloc(member_list_entries,
656  sizeof(const struct cpg_address *));
657 
658  for (size_t iter = 0; iter < member_list_entries; iter++) {
659  sorted[iter] = member_list + iter;
660  }
661 
662  // So that the cross-matching of multiply-subscribed nodes is then cheap
663  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
664  cmp_member_list_nodeid);
665 
666  for (int i = 0; i < left_list_entries; i++) {
667  node_left(group_name->value, counter, local_nodeid, &left_list[i],
668  sorted, member_list_entries);
669  }
670  free(sorted);
671  sorted = NULL;
672 
673  for (int i = 0; i < joined_list_entries; i++) {
674  crm_info("Group %s event %d: node %u pid %u joined%s",
675  group_name->value, counter, joined_list[i].nodeid,
676  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
677  }
678 
679  for (int i = 0; i < member_list_entries; i++) {
680  pcmk__node_status_t *peer =
681  pcmk__get_node(member_list[i].nodeid, NULL, NULL,
683 
684  if (member_list[i].nodeid == local_nodeid
685  && member_list[i].pid != getpid()) {
686  // See the note in node_left()
687  crm_warn("Group %s event %d: detected duplicate local pid %u",
688  group_name->value, counter, member_list[i].pid);
689  continue;
690  }
691  crm_info("Group %s event %d: %s (node %u pid %u) is member",
692  group_name->value, counter, peer_name(peer),
693  member_list[i].nodeid, member_list[i].pid);
694 
695  /* If the caller left auto-reaping enabled, this will also update the
696  * state to member.
697  */
698  peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
700 
701  if (peer && peer->state && strcmp(peer->state, PCMK_VALUE_MEMBER)) {
702  /* The node is a CPG member, but we currently think it's not a
703  * cluster member. This is possible only if auto-reaping was
704  * disabled. The node may be joining, and we happened to get the CPG
705  * notification before the quorum notification; or the node may have
706  * just died, and we are processing its final messages; or a bug
707  * has affected the peer cache.
708  */
709  time_t now = time(NULL);
710 
711  if (peer->when_lost == 0) {
712  // Track when we first got into this contradictory state
713  peer->when_lost = now;
714 
715  } else if (now > (peer->when_lost + 60)) {
716  // If it persists for more than a minute, update the state
717  crm_warn("Node %u is member of group %s but was believed "
718  "offline",
719  member_list[i].nodeid, group_name->value);
720  pcmk__update_peer_state(__func__, peer, PCMK_VALUE_MEMBER, 0);
721  }
722  }
723 
724  if (local_nodeid == member_list[i].nodeid) {
725  found = true;
726  }
727  }
728 
729  if (!found) {
730  crm_err("Local node was evicted from group %s", group_name->value);
731  cpg_evicted = true;
732  }
733 
734  counter++;
735 }
736 
745 int
746 pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
747 {
748  if (cluster == NULL) {
749  return EINVAL;
750  }
751  cluster->cpg.cpg_deliver_fn = fn;
752  return pcmk_rc_ok;
753 }
754 
763 int
764 pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
765 {
766  if (cluster == NULL) {
767  return EINVAL;
768  }
769  cluster->cpg.cpg_confchg_fn = fn;
770  return pcmk_rc_ok;
771 }
772 
780 int
782 {
783  cs_error_t rc;
784  int fd = -1;
785  int retries = 0;
786  uint32_t id = 0;
787  pcmk__node_status_t *peer = NULL;
788  cpg_handle_t handle = 0;
789  const char *cpg_group_name = NULL;
790  uid_t found_uid = 0;
791  gid_t found_gid = 0;
792  pid_t found_pid = 0;
793  int rv;
794 
795  struct mainloop_fd_callbacks cpg_fd_callbacks = {
796  .dispatch = pcmk_cpg_dispatch,
797  .destroy = cluster->destroy,
798  };
799 
800  cpg_model_v1_data_t cpg_model_info = {
801  .model = CPG_MODEL_V1,
802  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
803  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
804  .cpg_totem_confchg_fn = NULL,
805  .flags = 0,
806  };
807 
808  cpg_evicted = false;
809 
810  cpg_group_name = pcmk__server_message_type(cluster->priv->server);
811  if (cpg_group_name == NULL) {
812  /* The name will already be non-NULL for Pacemaker servers. If a
813  * command-line tool or external caller connects to the cluster,
814  * they will join this CPG group.
815  */
816  cpg_group_name = pcmk__s(crm_system_name, "unknown");
817  }
818  memset(cluster->priv->group.value, 0, 128);
819  strncpy(cluster->priv->group.value, cpg_group_name, 127);
820  cluster->priv->group.length = strlen(cluster->priv->group.value) + 1;
821 
822  cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
823  if (rc != CS_OK) {
824  crm_err("Could not connect to the CPG API: %s (%d)",
825  cs_strerror(rc), rc);
826  goto bail;
827  }
828 
829  rc = cpg_fd_get(handle, &fd);
830  if (rc != CS_OK) {
831  crm_err("Could not obtain the CPG API connection: %s (%d)",
832  cs_strerror(rc), rc);
833  goto bail;
834  }
835 
836  /* CPG provider run as root (in given user namespace, anyway)? */
837  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
838  &found_uid, &found_gid))) {
839  crm_err("CPG provider is not authentic:"
840  " process %lld (uid: %lld, gid: %lld)",
841  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
842  (long long) found_uid, (long long) found_gid);
843  rc = CS_ERR_ACCESS;
844  goto bail;
845  } else if (rv < 0) {
846  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
847  strerror(-rv), -rv);
848  rc = CS_ERR_ACCESS;
849  goto bail;
850  }
851 
852  id = pcmk__cpg_local_nodeid(handle);
853  if (id == 0) {
854  crm_err("Could not get local node id from the CPG API");
855  goto bail;
856 
857  }
858  cluster->priv->node_id = id;
859 
860  retries = 0;
861  cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group));
862  if (rc != CS_OK) {
863  crm_err("Could not join the CPG group '%s': %d", cpg_group_name, rc);
864  goto bail;
865  }
866 
867  pcmk_cpg_handle = handle;
868  cluster->priv->cpg_handle = handle;
869  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
870 
871  bail:
872  if (rc != CS_OK) {
873  cpg_finalize(handle);
874  // @TODO Map rc to more specific Pacemaker return code
875  return ENOTCONN;
876  }
877 
878  peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
880  return pcmk_rc_ok;
881 }
882 
889 void
891 {
892  pcmk_cpg_handle = 0;
893  if (cluster->priv->cpg_handle != 0) {
894  crm_trace("Disconnecting CPG");
895  cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group);
896  cpg_finalize(cluster->priv->cpg_handle);
897  cluster->priv->cpg_handle = 0;
898 
899  } else {
900  crm_info("No CPG connection");
901  }
902 }
903 
914 static bool
915 send_cpg_text(const char *data, const pcmk__node_status_t *node,
916  enum pcmk_ipc_server dest)
917 {
918  static int msg_id = 0;
919  static int local_pid = 0;
920  static int local_name_len = 0;
921  static const char *local_name = NULL;
922 
923  char *target = NULL;
924  struct iovec *iov;
925  pcmk__cpg_msg_t *msg = NULL;
926 
927  if (local_name == NULL) {
928  local_name = pcmk__cluster_local_node_name();
929  }
930  if ((local_name_len == 0) && (local_name != NULL)) {
931  local_name_len = strlen(local_name);
932  }
933 
934  if (data == NULL) {
935  data = "";
936  }
937 
938  if (local_pid == 0) {
939  local_pid = getpid();
940  }
941 
942  msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
943 
944  msg_id++;
945  msg->id = msg_id;
946  msg->header.error = CS_OK;
947 
948  msg->host.type = dest;
949 
950  if (node != NULL) {
951  if (node->name != NULL) {
952  target = pcmk__str_copy(node->name);
953  msg->host.size = strlen(node->name);
954  memset(msg->host.uname, 0, MAX_NAME);
955  memcpy(msg->host.uname, node->name, msg->host.size);
956 
957  } else {
958  target = crm_strdup_printf("%" PRIu32, node->cluster_layer_id);
959  }
960  msg->host.id = node->cluster_layer_id;
961 
962  } else {
963  target = pcmk__str_copy("all");
964  }
965 
966  msg->sender.id = 0;
967  msg->sender.type = pcmk__parse_server(crm_system_name);
968  msg->sender.pid = local_pid;
969  msg->sender.size = local_name_len;
970  memset(msg->sender.uname, 0, MAX_NAME);
971 
972  if ((local_name != NULL) && (msg->sender.size != 0)) {
973  memcpy(msg->sender.uname, local_name, msg->sender.size);
974  }
975 
976  msg->size = 1 + strlen(data);
977  msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
978 
979  if (msg->size < CRM_BZ2_THRESHOLD) {
980  msg = pcmk__realloc(msg, msg->header.size);
981  memcpy(msg->data, data, msg->size);
982 
983  } else {
984  char *compressed = NULL;
985  unsigned int new_size = 0;
986 
987  if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
988  &new_size) == pcmk_rc_ok) {
989 
990  msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
991  msg = pcmk__realloc(msg, msg->header.size);
992  memcpy(msg->data, compressed, new_size);
993 
994  msg->is_compressed = TRUE;
995  msg->compressed_size = new_size;
996 
997  } else {
998  // cppcheck seems not to understand the abort logic in pcmk__realloc
999  // cppcheck-suppress memleak
1000  msg = pcmk__realloc(msg, msg->header.size);
1001  memcpy(msg->data, data, msg->size);
1002  }
1003 
1004  free(compressed);
1005  }
1006 
1007  iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1008  iov->iov_base = msg;
1009  iov->iov_len = msg->header.size;
1010 
1011  if (msg->compressed_size > 0) {
1012  crm_trace("Queueing CPG message %u to %s "
1013  "(%llu bytes, %d bytes compressed payload): %.200s",
1014  msg->id, target, (unsigned long long) iov->iov_len,
1015  msg->compressed_size, data);
1016  } else {
1017  crm_trace("Queueing CPG message %u to %s "
1018  "(%llu bytes, %d bytes payload): %.200s",
1019  msg->id, target, (unsigned long long) iov->iov_len,
1020  msg->size, data);
1021  }
1022 
1023  free(target);
1024 
1025  cs_message_queue = g_list_append(cs_message_queue, iov);
1026  crm_cs_flush(&pcmk_cpg_handle);
1027 
1028  return true;
1029 }
1030 
1041 bool
1042 pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node,
1043  enum pcmk_ipc_server dest)
1044 {
1045  bool rc = true;
1046  GString *data = g_string_sized_new(1024);
1047 
1048  pcmk__xml_string(msg, 0, data, 0);
1049 
1050  rc = send_cpg_text(data->str, node, dest);
1051  g_string_free(data, TRUE);
1052  return rc;
1053 }
#define LOG_TRACE
Definition: logging.h:38
pcmk__cpg_host_t host
Definition: cpg.c:52
pcmk__node_status_t * pcmk__get_node(unsigned int id, const char *uname, const char *xml_id, uint32_t flags)
Definition: membership.c:927
#define cs_repeat(rc, counter, max, code)
Definition: cpg.c:86
uint32_t size
Definition: cpg.c:52
uint32_t pcmk__cpg_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:106
char data[0]
Definition: cpg.c:58
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:956
#define CRM_BZ2_THRESHOLD
Definition: xml_io.h:35
const char * name
Definition: cib.c:26
enum pcmk_ipc_server type
Definition: cpg.c:51
pcmk__cpg_host_t sender
Definition: cpg.c:53
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition: xml_io.c:411
Search for cluster nodes from membership cache.
Definition: internal.h:64
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:53
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
Definition: cpg.c:80
char * crm_system_name
Definition: utils.c:44
const char * pcmk__cluster_local_node_name(void)
Definition: cluster.c:289
const char * pcmk__server_message_type(enum pcmk_ipc_server server)
Definition: servers.c:162
void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:640
#define CS_SEND_MAX
Definition: cpg.c:196
bool pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node, enum pcmk_ipc_server dest)
Definition: cpg.c:1042
uint32_t cluster_layer_id
Cluster-layer numeric node ID.
Definition: internal.h:166
pcmk__node_status_t * pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags)
Definition: membership.c:801
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition: results.c:609
enum pcmk_ipc_server server
Server this connection is for (if any)
Definition: internal.h:92
Wrappers for and extensions to glib mainloop.
int pcmk__cpg_connect(pcmk_cluster_t *cluster)
Connect to Corosync CPG.
Definition: cpg.c:781
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
Definition: mainloop.h:150
struct pcmk__cpg_host_s pcmk__cpg_host_t
Definition: cpg.c:63
gboolean local
Definition: cpg.c:50
#define crm_warn(fmt, args...)
Definition: logging.h:362
pcmk_ipc_server
Available IPC interfaces.
Definition: ipc.h:48
uint32_t pid
Definition: cpg.c:49
void(* destroy)(gpointer)
Definition: cluster.h:40
#define crm_debug(fmt, args...)
Definition: logging.h:370
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
Definition: utils.c:401
#define crm_trace(fmt, args...)
Definition: logging.h:372
#define do_crm_log(level, fmt, args...)
Log a message.
Definition: logging.h:149
#define PCMK_VALUE_MEMBER
Definition: options.h:170
int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
Set the CPG deliver callback function for a cluster object.
Definition: cpg.c:746
Wrappers for and extensions to libxml2.
enum pcmk_ipc_server pcmk__parse_server(const char *text)
Definition: servers.c:178
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition: strings.c:839
int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
Set the CPG config change callback function for a cluster object.
Definition: cpg.c:764
uint32_t node_id
Local node ID at cluster layer.
Definition: internal.h:99
#define pcmk__str_copy(str)
uint32_t id
Definition: cpg.c:48
struct pcmk__cpg_host_s __attribute__((packed))
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
Definition: crm.h:73
uint32_t compressed_size
Definition: cpg.c:56
const char * target
Definition: pcmk_fence.c:31
#define PCMK_VALUE_ONLINE
Definition: options.h:186
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition: results.c:1014
gboolean is_compressed
Definition: cpg.c:50
void pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
Definition: cpg.c:890
#define crm_err(fmt, args...)
Definition: logging.h:359
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:194
pcmk__node_status_t * pcmk__update_peer_state(const char *source, pcmk__node_status_t *node, const char *state, uint64_t membership)
Update a node&#39;s state and membership information.
Definition: membership.c:1321
char uname[MAX_NAME]
Definition: cpg.c:53
#define msg_data_len(msg)
Definition: cpg.c:84
#define pcmk__plural_s(i)
IPC interface to Pacemaker daemons.
time_t when_lost
When CPG membership was last lost.
Definition: internal.h:167
#define pcmk__assert_alloc(nmemb, size)
Definition: internal.h:257
pcmk__node_status_t * crm_update_peer_proc(const char *source, pcmk__node_status_t *peer, uint32_t flag, const char *status)
Definition: membership.c:1095
#define PCMK_VALUE_OFFLINE
Definition: options.h:185
#define crm_info(fmt, args...)
Definition: logging.h:367
pcmk__cluster_private_t * priv
Definition: cluster.h:36
Node status data (may be a cluster node or a Pacemaker Remote node)
Definition: internal.h:112
char * name
Node name as known to cluster layer, or Pacemaker Remote node name.
Definition: internal.h:114
char * pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, const char **from)
Definition: cpg.c:397
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
Definition: ipc_client.c:1532