pacemaker  2.1.8-3980678f03
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2024 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 
12 #include <arpa/inet.h>
13 #include <inttypes.h> // PRIu32
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdbool.h>
17 #include <stdint.h> // uint32_t
18 #include <sys/socket.h>
19 #include <sys/types.h> // size_t
20 #include <sys/utsname.h>
21 
22 #include <bzlib.h>
23 #include <corosync/corodefs.h>
24 #include <corosync/corotypes.h>
25 #include <corosync/hdb.h>
26 #include <corosync/cpg.h>
27 #include <qb/qbipc_common.h>
28 #include <qb/qbipcc.h>
29 #include <qb/qbutil.h>
30 
31 #include <crm/cluster/internal.h>
32 #include <crm/common/ipc.h>
33 #include <crm/common/ipc_internal.h> // PCMK__SPECIAL_PID
34 #include <crm/common/mainloop.h>
35 #include <crm/common/xml.h>
36 
37 #include "crmcluster_private.h"
38 
39 /* @TODO Once we can update the public API to require pcmk_cluster_t* in more
40  * functions, we can ditch this in favor of cluster->cpg_handle.
41  */
42 static cpg_handle_t pcmk_cpg_handle = 0;
43 
44 // @TODO These could be moved to pcmk_cluster_t* at that time as well
45 static bool cpg_evicted = false;
46 static GList *cs_message_queue = NULL;
47 static int cs_message_timer = 0;
48 
49 struct pcmk__cpg_host_s {
50  uint32_t id;
51  uint32_t pid;
52  gboolean local;
54  uint32_t size;
55  char uname[MAX_NAME];
56 } __attribute__ ((packed));
57 
58 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
59 
60 struct pcmk__cpg_msg_s {
61  struct qb_ipc_response_header header __attribute__ ((aligned(8)));
62  uint32_t id;
63  gboolean is_compressed;
64 
67 
68  uint32_t size;
69  uint32_t compressed_size;
70  /* 584 bytes */
71  char data[0];
72 
73 } __attribute__ ((packed));
74 
75 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
76 
77 static void crm_cs_flush(gpointer data);
78 
79 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
80 
81 #define cs_repeat(rc, counter, max, code) do { \
82  rc = code; \
83  if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
84  counter++; \
85  crm_debug("Retrying operation after %ds", counter); \
86  sleep(counter); \
87  } else { \
88  break; \
89  } \
90  } while (counter < max)
91 
100 uint32_t
101 pcmk__cpg_local_nodeid(cpg_handle_t handle)
102 {
103  cs_error_t rc = CS_OK;
104  int retries = 0;
105  static uint32_t local_nodeid = 0;
106  cpg_handle_t local_handle = handle;
107  cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
108  int fd = -1;
109  uid_t found_uid = 0;
110  gid_t found_gid = 0;
111  pid_t found_pid = 0;
112  int rv = 0;
113 
114  if (local_nodeid != 0) {
115  return local_nodeid;
116  }
117 
118  if (handle == 0) {
119  crm_trace("Creating connection");
120  cs_repeat(rc, retries, 5,
121  cpg_model_initialize(&local_handle, CPG_MODEL_V1,
122  (cpg_model_data_t *) &cpg_model_info,
123  NULL));
124  if (rc != CS_OK) {
125  crm_err("Could not connect to the CPG API: %s (%d)",
126  cs_strerror(rc), rc);
127  return 0;
128  }
129 
130  rc = cpg_fd_get(local_handle, &fd);
131  if (rc != CS_OK) {
132  crm_err("Could not obtain the CPG API connection: %s (%d)",
133  cs_strerror(rc), rc);
134  goto bail;
135  }
136 
137  // CPG provider run as root (at least in given user namespace)?
138  rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
139  &found_uid, &found_gid);
140  if (rv == 0) {
141  crm_err("CPG provider is not authentic:"
142  " process %lld (uid: %lld, gid: %lld)",
143  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
144  (long long) found_uid, (long long) found_gid);
145  goto bail;
146 
147  } else if (rv < 0) {
148  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
149  strerror(-rv), -rv);
150  goto bail;
151  }
152  }
153 
154  if (rc == CS_OK) {
155  retries = 0;
156  crm_trace("Performing lookup");
157  cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
158  }
159 
160  if (rc != CS_OK) {
161  crm_err("Could not get local node id from the CPG API: %s (%d)",
162  pcmk__cs_err_str(rc), rc);
163  }
164 
165 bail:
166  if (handle == 0) {
167  crm_trace("Closing connection");
168  cpg_finalize(local_handle);
169  }
170  crm_debug("Local nodeid is %u", local_nodeid);
171  return local_nodeid;
172 }
173 
182 static gboolean
183 crm_cs_flush_cb(gpointer data)
184 {
185  cs_message_timer = 0;
186  crm_cs_flush(data);
187  return FALSE;
188 }
189 
190 // Send no more than this many CPG messages in one flush
191 #define CS_SEND_MAX 200
192 
199 static void
200 crm_cs_flush(gpointer data)
201 {
202  unsigned int sent = 0;
203  guint queue_len = 0;
204  cs_error_t rc = 0;
205  cpg_handle_t *handle = (cpg_handle_t *) data;
206 
207  if (*handle == 0) {
208  crm_trace("Connection is dead");
209  return;
210  }
211 
212  queue_len = g_list_length(cs_message_queue);
213  if (((queue_len % 1000) == 0) && (queue_len > 1)) {
214  crm_err("CPG queue has grown to %d", queue_len);
215 
216  } else if (queue_len == CS_SEND_MAX) {
217  crm_warn("CPG queue has grown to %d", queue_len);
218  }
219 
220  if (cs_message_timer != 0) {
221  /* There is already a timer, wait until it goes off */
222  crm_trace("Timer active %d", cs_message_timer);
223  return;
224  }
225 
226  while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
227  struct iovec *iov = cs_message_queue->data;
228 
229  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
230  if (rc != CS_OK) {
231  break;
232  }
233 
234  sent++;
235  crm_trace("CPG message sent, size=%llu",
236  (unsigned long long) iov->iov_len);
237 
238  cs_message_queue = g_list_remove(cs_message_queue, iov);
239  free(iov->iov_base);
240  free(iov);
241  }
242 
243  queue_len -= sent;
244  do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
245  "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
246  sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
247  (int) rc);
248 
249  if (cs_message_queue) {
250  uint32_t delay_ms = 100;
251  if (rc != CS_OK) {
252  /* Proportionally more if sending failed but cap at 1s */
253  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
254  }
255  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
256  }
257 }
258 
267 static int
268 pcmk_cpg_dispatch(gpointer user_data)
269 {
270  cs_error_t rc = CS_OK;
271  pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
272 
273  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
274  if (rc != CS_OK) {
275  crm_err("Connection to the CPG API failed: %s (%d)",
276  pcmk__cs_err_str(rc), rc);
277  cpg_finalize(cluster->cpg_handle);
278  cluster->cpg_handle = 0;
279  return -1;
280 
281  } else if (cpg_evicted) {
282  crm_err("Evicted from CPG membership");
283  return -1;
284  }
285  return 0;
286 }
287 
288 static inline const char *
289 ais_dest(const pcmk__cpg_host_t *host)
290 {
291  if (host->local) {
292  return "local";
293  } else if (host->size > 0) {
294  return host->uname;
295  } else {
296  return "<all>";
297  }
298 }
299 
300 static inline const char *
301 msg_type2text(enum crm_ais_msg_types type)
302 {
303  const char *text = "unknown";
304 
305  switch (type) {
306  case crm_msg_none:
307  text = "unknown";
308  break;
309  case crm_msg_ais:
310  text = "ais";
311  break;
312  case crm_msg_cib:
313  text = "cib";
314  break;
315  case crm_msg_crmd:
316  text = "crmd";
317  break;
318  case crm_msg_pe:
319  text = "pengine";
320  break;
321  case crm_msg_te:
322  text = "tengine";
323  break;
324  case crm_msg_lrmd:
325  text = "lrmd";
326  break;
327  case crm_msg_attrd:
328  text = "attrd";
329  break;
330  case crm_msg_stonithd:
331  text = "stonithd";
332  break;
333  case crm_msg_stonith_ng:
334  text = "stonith-ng";
335  break;
336  }
337  return text;
338 }
339 
348 static bool
349 check_message_sanity(const pcmk__cpg_msg_t *msg)
350 {
351  int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
352 
353  if (payload_size < 1) {
354  crm_err("%sCPG message %d from %s invalid: "
355  "Claimed size of %d bytes is too small "
356  CRM_XS " from %s[%u] to %s@%s",
357  (msg->is_compressed? "Compressed " : ""),
358  msg->id, ais_dest(&(msg->sender)),
359  (int) msg->header.size,
360  msg_type2text(msg->sender.type), msg->sender.pid,
361  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
362  return false;
363  }
364 
365  if (msg->header.error != CS_OK) {
366  crm_err("%sCPG message %d from %s invalid: "
367  "Sender indicated error %d "
368  CRM_XS " from %s[%u] to %s@%s",
369  (msg->is_compressed? "Compressed " : ""),
370  msg->id, ais_dest(&(msg->sender)),
371  msg->header.error,
372  msg_type2text(msg->sender.type), msg->sender.pid,
373  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
374  return false;
375  }
376 
377  if (msg_data_len(msg) != payload_size) {
378  crm_err("%sCPG message %d from %s invalid: "
379  "Total size %d inconsistent with payload size %d "
380  CRM_XS " from %s[%u] to %s@%s",
381  (msg->is_compressed? "Compressed " : ""),
382  msg->id, ais_dest(&(msg->sender)),
383  (int) msg->header.size, (int) msg_data_len(msg),
384  msg_type2text(msg->sender.type), msg->sender.pid,
385  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
386  return false;
387  }
388 
389  if (!msg->is_compressed &&
390  /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
391  * but checking the last byte or two should be quick
392  */
393  (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
394  || (msg->data[msg->size - 1] != '\0'))) {
395  crm_err("CPG message %d from %s invalid: "
396  "Payload does not end at byte %llu "
397  CRM_XS " from %s[%u] to %s@%s",
398  msg->id, ais_dest(&(msg->sender)),
399  (unsigned long long) msg->size,
400  msg_type2text(msg->sender.type), msg->sender.pid,
401  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
402  return false;
403  }
404 
405  crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
406  (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
407  msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
408  ais_dest(&(msg->sender)),
409  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
410  return true;
411 }
412 
431 char *
432 pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
433  void *content, uint32_t *kind, const char **from)
434 {
435  char *data = NULL;
436  pcmk__cpg_msg_t *msg = content;
437 
438  if (handle != 0) {
439  // Do filtering and field massaging
440  uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
441  const char *local_name = pcmk__cluster_local_node_name();
442 
443  if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) {
444  crm_err("Nodeid mismatch from %" PRIu32 ".%" PRIu32
445  ": claimed nodeid=%" PRIu32,
446  sender_id, pid, msg->sender.id);
447  return NULL;
448  }
449  if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
450  crm_trace("Not for us: %" PRIu32" != %" PRIu32,
451  msg->host.id, local_nodeid);
452  return NULL;
453  }
454  if ((msg->host.size > 0)
455  && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
456 
457  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
458  return NULL;
459  }
460 
461  msg->sender.id = sender_id;
462  if (msg->sender.size == 0) {
463  const crm_node_t *peer =
464  pcmk__get_node(sender_id, NULL, NULL,
466 
467  if (peer->uname == NULL) {
468  crm_err("No uname for peer with nodeid=%u", sender_id);
469 
470  } else {
471  crm_notice("Fixing uname for peer with nodeid=%u", sender_id);
472  msg->sender.size = strlen(peer->uname);
473  memset(msg->sender.uname, 0, MAX_NAME);
474  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
475  }
476  }
477  }
478 
479  crm_trace("Got new%s message (size=%d, %d, %d)",
480  msg->is_compressed ? " compressed" : "",
481  msg_data_len(msg), msg->size, msg->compressed_size);
482 
483  if (kind != NULL) {
484  *kind = msg->header.id;
485  }
486  if (from != NULL) {
487  *from = msg->sender.uname;
488  }
489 
490  if (msg->is_compressed && (msg->size > 0)) {
491  int rc = BZ_OK;
492  char *uncompressed = NULL;
493  unsigned int new_size = msg->size + 1;
494 
495  if (!check_message_sanity(msg)) {
496  goto badmsg;
497  }
498 
499  crm_trace("Decompressing message data");
500  uncompressed = pcmk__assert_alloc(1, new_size);
501  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
502  msg->compressed_size, 1, 0);
503 
504  rc = pcmk__bzlib2rc(rc);
505 
506  if (rc != pcmk_rc_ok) {
507  crm_err("Decompression failed: %s " CRM_XS " rc=%d",
508  pcmk_rc_str(rc), rc);
509  free(uncompressed);
510  goto badmsg;
511  }
512 
513  CRM_ASSERT(new_size == msg->size);
514 
515  data = uncompressed;
516 
517  } else if (!check_message_sanity(msg)) {
518  goto badmsg;
519 
520  } else {
521  data = strdup(msg->data);
522  }
523 
524  // Is this necessary?
525  pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
527 
528  crm_trace("Payload: %.200s", data);
529  return data;
530 
531  badmsg:
532  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
533  " min=%d, total=%d, size=%d, bz2_size=%d",
534  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
535  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
536  msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
537  msg->header.size, msg->size, msg->compressed_size);
538 
539  free(data);
540  return NULL;
541 }
542 
554 static int
555 cmp_member_list_nodeid(const void *first, const void *second)
556 {
557  const struct cpg_address *const a = *((const struct cpg_address **) first),
558  *const b = *((const struct cpg_address **) second);
559  if (a->nodeid < b->nodeid) {
560  return -1;
561  } else if (a->nodeid > b->nodeid) {
562  return 1;
563  }
564  /* don't bother with "reason" nor "pid" */
565  return 0;
566 }
567 
576 static const char *
577 cpgreason2str(cpg_reason_t reason)
578 {
579  switch (reason) {
580  case CPG_REASON_JOIN: return " via cpg_join";
581  case CPG_REASON_LEAVE: return " via cpg_leave";
582  case CPG_REASON_NODEDOWN: return " via cluster exit";
583  case CPG_REASON_NODEUP: return " via cluster join";
584  case CPG_REASON_PROCDOWN: return " for unknown reason";
585  default: break;
586  }
587  return "";
588 }
589 
598 static inline const char *
599 peer_name(const crm_node_t *peer)
600 {
601  if (peer == NULL) {
602  return "unknown node";
603  } else if (peer->uname == NULL) {
604  return "peer node";
605  } else {
606  return peer->uname;
607  }
608 }
609 
621 static void
622 node_left(const char *cpg_group_name, int event_counter,
623  uint32_t local_nodeid, const struct cpg_address *cpg_peer,
624  const struct cpg_address **sorted_member_list,
625  size_t member_list_entries)
626 {
627  crm_node_t *peer =
628  pcmk__search_node_caches(cpg_peer->nodeid, NULL,
630  const struct cpg_address **rival = NULL;
631 
632  /* Most CPG-related Pacemaker code assumes that only one process on a node
633  * can be in the process group, but Corosync does not impose this
634  * limitation, and more than one can be a member in practice due to a
635  * daemon attempting to start while another instance is already running.
636  *
637  * Check for any such duplicate instances, because we don't want to process
638  * their leaving as if our actual peer left. If the peer that left still has
639  * an entry in sorted_member_list (with a different PID), we will ignore the
640  * leaving.
641  *
642  * @TODO Track CPG members' PIDs so we can tell exactly who left.
643  */
644  if (peer != NULL) {
645  rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
646  sizeof(const struct cpg_address *),
647  cmp_member_list_nodeid);
648  }
649 
650  if (rival == NULL) {
651  crm_info("Group %s event %d: %s (node %u pid %u) left%s",
652  cpg_group_name, event_counter, peer_name(peer),
653  cpg_peer->nodeid, cpg_peer->pid,
654  cpgreason2str(cpg_peer->reason));
655  if (peer != NULL) {
656  crm_update_peer_proc(__func__, peer, crm_proc_cpg,
658  }
659  } else if (cpg_peer->nodeid == local_nodeid) {
660  crm_warn("Group %s event %d: duplicate local pid %u left%s",
661  cpg_group_name, event_counter,
662  cpg_peer->pid, cpgreason2str(cpg_peer->reason));
663  } else {
664  crm_warn("Group %s event %d: "
665  "%s (node %u) duplicate pid %u left%s (%u remains)",
666  cpg_group_name, event_counter, peer_name(peer),
667  cpg_peer->nodeid, cpg_peer->pid,
668  cpgreason2str(cpg_peer->reason), (*rival)->pid);
669  }
670 }
671 
688 void
689 pcmk__cpg_confchg_cb(cpg_handle_t handle,
690  const struct cpg_name *group_name,
691  const struct cpg_address *member_list,
692  size_t member_list_entries,
693  const struct cpg_address *left_list,
694  size_t left_list_entries,
695  const struct cpg_address *joined_list,
696  size_t joined_list_entries)
697 {
698  static int counter = 0;
699 
700  bool found = false;
701  uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
702  const struct cpg_address **sorted = NULL;
703 
704  sorted = pcmk__assert_alloc(member_list_entries,
705  sizeof(const struct cpg_address *));
706 
707  for (size_t iter = 0; iter < member_list_entries; iter++) {
708  sorted[iter] = member_list + iter;
709  }
710 
711  // So that the cross-matching of multiply-subscribed nodes is then cheap
712  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
713  cmp_member_list_nodeid);
714 
715  for (int i = 0; i < left_list_entries; i++) {
716  node_left(group_name->value, counter, local_nodeid, &left_list[i],
717  sorted, member_list_entries);
718  }
719  free(sorted);
720  sorted = NULL;
721 
722  for (int i = 0; i < joined_list_entries; i++) {
723  crm_info("Group %s event %d: node %u pid %u joined%s",
724  group_name->value, counter, joined_list[i].nodeid,
725  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
726  }
727 
728  for (int i = 0; i < member_list_entries; i++) {
729  crm_node_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL,
731 
732  if (member_list[i].nodeid == local_nodeid
733  && member_list[i].pid != getpid()) {
734  // See the note in node_left()
735  crm_warn("Group %s event %d: detected duplicate local pid %u",
736  group_name->value, counter, member_list[i].pid);
737  continue;
738  }
739  crm_info("Group %s event %d: %s (node %u pid %u) is member",
740  group_name->value, counter, peer_name(peer),
741  member_list[i].nodeid, member_list[i].pid);
742 
743  /* If the caller left auto-reaping enabled, this will also update the
744  * state to member.
745  */
746  peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
748 
749  if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
750  /* The node is a CPG member, but we currently think it's not a
751  * cluster member. This is possible only if auto-reaping was
752  * disabled. The node may be joining, and we happened to get the CPG
753  * notification before the quorum notification; or the node may have
754  * just died, and we are processing its final messages; or a bug
755  * has affected the peer cache.
756  */
757  time_t now = time(NULL);
758 
759  if (peer->when_lost == 0) {
760  // Track when we first got into this contradictory state
761  peer->when_lost = now;
762 
763  } else if (now > (peer->when_lost + 60)) {
764  // If it persists for more than a minute, update the state
765  crm_warn("Node %u is member of group %s but was believed "
766  "offline",
767  member_list[i].nodeid, group_name->value);
768  pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
769  }
770  }
771 
772  if (local_nodeid == member_list[i].nodeid) {
773  found = true;
774  }
775  }
776 
777  if (!found) {
778  crm_err("Local node was evicted from group %s", group_name->value);
779  cpg_evicted = true;
780  }
781 
782  counter++;
783 }
784 
793 int
794 pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
795 {
796  if (cluster == NULL) {
797  return EINVAL;
798  }
799  cluster->cpg.cpg_deliver_fn = fn;
800  return pcmk_rc_ok;
801 }
802 
811 int
812 pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
813 {
814  if (cluster == NULL) {
815  return EINVAL;
816  }
817  cluster->cpg.cpg_confchg_fn = fn;
818  return pcmk_rc_ok;
819 }
820 
828 int
830 {
831  cs_error_t rc;
832  int fd = -1;
833  int retries = 0;
834  uint32_t id = 0;
835  crm_node_t *peer = NULL;
836  cpg_handle_t handle = 0;
837  const char *message_name = pcmk__message_name(crm_system_name);
838  uid_t found_uid = 0;
839  gid_t found_gid = 0;
840  pid_t found_pid = 0;
841  int rv;
842 
843  struct mainloop_fd_callbacks cpg_fd_callbacks = {
844  .dispatch = pcmk_cpg_dispatch,
845  .destroy = cluster->destroy,
846  };
847 
848  cpg_model_v1_data_t cpg_model_info = {
849  .model = CPG_MODEL_V1,
850  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
851  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
852  .cpg_totem_confchg_fn = NULL,
853  .flags = 0,
854  };
855 
856  cpg_evicted = false;
857  cluster->group.length = 0;
858  cluster->group.value[0] = 0;
859 
860  /* group.value is char[128] */
861  strncpy(cluster->group.value, message_name, 127);
862  cluster->group.value[127] = 0;
863  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
864 
865  cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
866  if (rc != CS_OK) {
867  crm_err("Could not connect to the CPG API: %s (%d)",
868  cs_strerror(rc), rc);
869  goto bail;
870  }
871 
872  rc = cpg_fd_get(handle, &fd);
873  if (rc != CS_OK) {
874  crm_err("Could not obtain the CPG API connection: %s (%d)",
875  cs_strerror(rc), rc);
876  goto bail;
877  }
878 
879  /* CPG provider run as root (in given user namespace, anyway)? */
880  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
881  &found_uid, &found_gid))) {
882  crm_err("CPG provider is not authentic:"
883  " process %lld (uid: %lld, gid: %lld)",
884  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
885  (long long) found_uid, (long long) found_gid);
886  rc = CS_ERR_ACCESS;
887  goto bail;
888  } else if (rv < 0) {
889  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
890  strerror(-rv), -rv);
891  rc = CS_ERR_ACCESS;
892  goto bail;
893  }
894 
895  id = pcmk__cpg_local_nodeid(handle);
896  if (id == 0) {
897  crm_err("Could not get local node id from the CPG API");
898  goto bail;
899 
900  }
901  cluster->nodeid = id;
902 
903  retries = 0;
904  cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
905  if (rc != CS_OK) {
906  crm_err("Could not join the CPG group '%s': %d", message_name, rc);
907  goto bail;
908  }
909 
910  pcmk_cpg_handle = handle;
911  cluster->cpg_handle = handle;
912  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
913 
914  bail:
915  if (rc != CS_OK) {
916  cpg_finalize(handle);
917  // @TODO Map rc to more specific Pacemaker return code
918  return ENOTCONN;
919  }
920 
921  peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
923  return pcmk_rc_ok;
924 }
925 
932 void
934 {
935  pcmk_cpg_handle = 0;
936  if (cluster->cpg_handle != 0) {
937  crm_trace("Disconnecting CPG");
938  cpg_leave(cluster->cpg_handle, &cluster->group);
939  cpg_finalize(cluster->cpg_handle);
940  cluster->cpg_handle = 0;
941 
942  } else {
943  crm_info("No CPG connection");
944  }
945 }
946 
958 static bool
959 send_cpg_text(const char *data, bool local, const crm_node_t *node,
960  enum crm_ais_msg_types dest)
961 {
962  // @COMPAT Drop local argument when send_cluster_text is dropped
963  static int msg_id = 0;
964  static int local_pid = 0;
965  static int local_name_len = 0;
966  static const char *local_name = NULL;
967 
968  char *target = NULL;
969  struct iovec *iov;
970  pcmk__cpg_msg_t *msg = NULL;
971 
972  CRM_CHECK(dest != crm_msg_ais, return false);
973 
974  if (local_name == NULL) {
975  local_name = pcmk__cluster_local_node_name();
976  }
977  if ((local_name_len == 0) && (local_name != NULL)) {
978  local_name_len = strlen(local_name);
979  }
980 
981  if (data == NULL) {
982  data = "";
983  }
984 
985  if (local_pid == 0) {
986  local_pid = getpid();
987  }
988 
989  msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
990 
991  msg_id++;
992  msg->id = msg_id;
993  msg->header.id = crm_class_cluster;
994  msg->header.error = CS_OK;
995 
996  msg->host.type = dest;
997  msg->host.local = local;
998 
999  if (node != NULL) {
1000  if (node->uname != NULL) {
1001  target = pcmk__str_copy(node->uname);
1002  msg->host.size = strlen(node->uname);
1003  memset(msg->host.uname, 0, MAX_NAME);
1004  memcpy(msg->host.uname, node->uname, msg->host.size);
1005 
1006  } else {
1007  target = crm_strdup_printf("%u", node->id);
1008  }
1009  msg->host.id = node->id;
1010 
1011  } else {
1012  target = pcmk__str_copy("all");
1013  }
1014 
1015  msg->sender.id = 0;
1016  msg->sender.type = pcmk__cluster_parse_msg_type(crm_system_name);
1017  msg->sender.pid = local_pid;
1018  msg->sender.size = local_name_len;
1019  memset(msg->sender.uname, 0, MAX_NAME);
1020 
1021  if ((local_name != NULL) && (msg->sender.size != 0)) {
1022  memcpy(msg->sender.uname, local_name, msg->sender.size);
1023  }
1024 
1025  msg->size = 1 + strlen(data);
1026  msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
1027 
1028  if (msg->size < CRM_BZ2_THRESHOLD) {
1029  msg = pcmk__realloc(msg, msg->header.size);
1030  memcpy(msg->data, data, msg->size);
1031 
1032  } else {
1033  char *compressed = NULL;
1034  unsigned int new_size = 0;
1035 
1036  if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
1037  &new_size) == pcmk_rc_ok) {
1038 
1039  msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
1040  msg = pcmk__realloc(msg, msg->header.size);
1041  memcpy(msg->data, compressed, new_size);
1042 
1043  msg->is_compressed = TRUE;
1044  msg->compressed_size = new_size;
1045 
1046  } else {
1047  // cppcheck seems not to understand the abort logic in pcmk__realloc
1048  // cppcheck-suppress memleak
1049  msg = pcmk__realloc(msg, msg->header.size);
1050  memcpy(msg->data, data, msg->size);
1051  }
1052 
1053  free(compressed);
1054  }
1055 
1056  iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1057  iov->iov_base = msg;
1058  iov->iov_len = msg->header.size;
1059 
1060  if (msg->compressed_size > 0) {
1061  crm_trace("Queueing CPG message %u to %s "
1062  "(%llu bytes, %d bytes compressed payload): %.200s",
1063  msg->id, target, (unsigned long long) iov->iov_len,
1064  msg->compressed_size, data);
1065  } else {
1066  crm_trace("Queueing CPG message %u to %s "
1067  "(%llu bytes, %d bytes payload): %.200s",
1068  msg->id, target, (unsigned long long) iov->iov_len,
1069  msg->size, data);
1070  }
1071 
1072  free(target);
1073 
1074  cs_message_queue = g_list_append(cs_message_queue, iov);
1075  crm_cs_flush(&pcmk_cpg_handle);
1076 
1077  return true;
1078 }
1079 
1090 bool
1091 pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node,
1092  enum crm_ais_msg_types dest)
1093 {
1094  bool rc = true;
1095  GString *data = g_string_sized_new(1024);
1096 
1097  pcmk__xml_string(msg, 0, data, 0);
1098 
1099  rc = send_cpg_text(data->str, false, node, dest);
1100  g_string_free(data, TRUE);
1101  return rc;
1102 }
1103 
1104 // Deprecated functions kept only for backward API compatibility
1105 // LCOV_EXCL_START
1106 
1107 #include <crm/cluster/compat.h>
1108 
1109 gboolean
1111 {
1112  return pcmk__cpg_connect(cluster) == pcmk_rc_ok;
1113 }
1114 
1115 void
1117 {
1118  pcmk__cpg_disconnect(cluster);
1119 }
1120 
1121 uint32_t
1122 get_local_nodeid(cpg_handle_t handle)
1123 {
1124  return pcmk__cpg_local_nodeid(handle);
1125 }
1126 
1127 void
1128 pcmk_cpg_membership(cpg_handle_t handle,
1129  const struct cpg_name *group_name,
1130  const struct cpg_address *member_list,
1131  size_t member_list_entries,
1132  const struct cpg_address *left_list,
1133  size_t left_list_entries,
1134  const struct cpg_address *joined_list,
1135  size_t joined_list_entries)
1136 {
1137  pcmk__cpg_confchg_cb(handle, group_name, member_list, member_list_entries,
1138  left_list, left_list_entries,
1139  joined_list, joined_list_entries);
1140 }
1141 
1142 gboolean
1143 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
1144  gboolean local, const crm_node_t *node,
1145  enum crm_ais_msg_types dest)
1146 {
1147  switch (msg_class) {
1148  case crm_class_cluster:
1149  return send_cpg_text(data, local, node, dest);
1150  default:
1151  crm_err("Invalid message class: %d", msg_class);
1152  return FALSE;
1153  }
1154 }
1155 
1156 char *
1157 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid,
1158  void *content, uint32_t *kind, const char **from)
1159 {
1160  return pcmk__cpg_message_data(handle, nodeid, pid, content, kind, from);
1161 }
1162 
1163 enum crm_ais_msg_types
1164 text2msg_type(const char *text)
1165 {
1166  int type = crm_msg_none;
1167 
1168  CRM_CHECK(text != NULL, return type);
1169  text = pcmk__message_name(text);
1170  if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1171  type = crm_msg_ais;
1172  } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1173  type = crm_msg_cib;
1174  } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1175  type = crm_msg_crmd;
1176  } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1177  type = crm_msg_te;
1178  } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1179  type = crm_msg_pe;
1180  } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1181  type = crm_msg_lrmd;
1182  } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1184  } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1186  } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1187  type = crm_msg_attrd;
1188 
1189  } else {
1190  /* This will normally be a transient client rather than
1191  * a cluster daemon. Set the type to the pid of the client
1192  */
1193  int scan_rc = sscanf(text, "%d", &type);
1194 
1195  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1196  /* Ensure it's sane */
1197  type = crm_msg_none;
1198  }
1199  }
1200  return type;
1201 }
1202 
1203 // LCOV_EXCL_STOP
1204 // End deprecated API
#define LOG_TRACE
Definition: logging.h:38
pcmk__cpg_host_t host
Definition: cpg.c:52
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:245
#define cs_repeat(rc, counter, max, code)
Definition: cpg.c:81
uint32_t size
Definition: cpg.c:52
#define crm_notice(fmt, args...)
Definition: logging.h:397
uint32_t pcmk__cpg_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:101
crm_node_t * pcmk__get_node(unsigned int id, const char *uname, const char *uuid, uint32_t flags)
Definition: membership.c:890
char data[0]
Definition: cpg.c:58
crm_ais_msg_types
Definition: cluster.h:197
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:958
#define CRM_BZ2_THRESHOLD
Definition: xml_io.h:35
uint32_t nodeid
Definition: cluster.h:142
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
Definition: strings.c:1026
uint32_t id
Definition: cluster.h:120
pcmk__cpg_host_t sender
Definition: cpg.c:53
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition: xml_io.c:488
Search for cluster nodes from membership cache.
Definition: internal.h:37
void(* destroy)(gpointer)
Definition: cluster.h:146
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:61
bool pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:1091
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
Definition: cpg.c:75
char * crm_system_name
Definition: utils.c:50
const char * pcmk__cluster_local_node_name(void)
Definition: cluster.c:325
void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:689
#define CS_SEND_MAX
Definition: cpg.c:191
enum crm_ais_msg_types type
Definition: cpg.c:51
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition: results.c:501
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:1157
Wrappers for and extensions to glib mainloop.
int pcmk__cpg_connect(pcmk_cluster_t *cluster)
Connect to Corosync CPG.
Definition: cpg.c:829
#define CRM_SYSTEM_DC
Definition: crm.h:87
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
Definition: mainloop.h:148
struct pcmk__cpg_host_s pcmk__cpg_host_t
Definition: cpg.c:58
gboolean local
Definition: cpg.c:50
#define crm_warn(fmt, args...)
Definition: logging.h:394
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node&#39;s state and membership information.
Definition: membership.c:1278
uint32_t pid
Definition: cpg.c:49
#define crm_debug(fmt, args...)
Definition: logging.h:402
time_t when_lost
Definition: cluster.h:121
char * pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:432
#define crm_trace(fmt, args...)
Definition: logging.h:404
#define do_crm_log(level, fmt, args...)
Log a message.
Definition: logging.h:181
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:1059
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, const crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:1143
#define CRM_SYSTEM_PENGINE
Definition: crm.h:92
int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
Set the CPG deliver callback function for a cluster object.
Definition: cpg.c:794
Wrappers for and extensions to libxml2.
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition: strings.c:837
#define CRM_NODE_MEMBER
Definition: cluster.h:49
int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
Set the CPG config change callback function for a cluster object.
Definition: cpg.c:812
#define pcmk__str_copy(str)
uint32_t id
Definition: cpg.c:48
struct pcmk__cpg_host_s __attribute__((packed))
Deprecated Pacemaker cluster API.
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
Definition: crm.h:79
uint32_t compressed_size
Definition: cpg.c:56
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: messages.c:171
#define CRM_SYSTEM_CRMD
Definition: crm.h:90
const char * target
Definition: pcmk_fence.c:29
crm_ais_msg_class
Definition: cluster.h:189
#define CRM_XS
Definition: logging.h:56
#define PCMK_VALUE_ONLINE
Definition: options.h:184
gboolean cluster_connect_cpg(pcmk_cluster_t *cluster)
Definition: cpg.c:1110
#define CRM_SYSTEM_STONITHD
Definition: crm.h:94
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition: results.c:906
#define CRM_SYSTEM_CIB
Definition: crm.h:89
#define CRM_SYSTEM_TENGINE
Definition: crm.h:93
gboolean is_compressed
Definition: cpg.c:50
void pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
Definition: cpg.c:933
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:1122
crm_node_t * pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags)
Definition: membership.c:765
#define crm_err(fmt, args...)
Definition: logging.h:391
enum crm_ais_msg_types pcmk__cluster_parse_msg_type(const char *text)
Definition: cluster.c:44
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:192
#define CRM_ASSERT(expr)
Definition: results.h:42
enum crm_ais_msg_types text2msg_type(const char *text)
Definition: cpg.c:1164
#define CRM_SYSTEM_LRMD
Definition: crm.h:91
char uname[MAX_NAME]
Definition: cpg.c:53
char * state
Definition: cluster.h:109
#define msg_data_len(msg)
Definition: cpg.c:79
#define pcmk__plural_s(i)
void cluster_disconnect_cpg(pcmk_cluster_t *cluster)
Definition: cpg.c:1116
IPC interface to Pacemaker daemons.
char * uname
Definition: cluster.h:88
#define pcmk__assert_alloc(nmemb, size)
Definition: internal.h:297
#define PCMK_VALUE_OFFLINE
Definition: options.h:183
#define crm_info(fmt, args...)
Definition: logging.h:399
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:1128
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
Definition: ipc_client.c:1580