pacemaker  2.1.2-ada5c3b36
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2021 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipc_common.h>
23 #include <qb/qbipcc.h>
24 #include <qb/qbutil.h>
25 
26 #include <corosync/corodefs.h>
27 #include <corosync/corotypes.h>
28 #include <corosync/hdb.h>
29 #include <corosync/cpg.h>
30 
31 #include <crm/msg_xml.h>
32 
33 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
34 #include "crmcluster_private.h"
35 
36 /* @TODO Once we can update the public API to require crm_cluster_t* in more
37  * functions, we can ditch this in favor of cluster->cpg_handle.
38  */
39 static cpg_handle_t pcmk_cpg_handle = 0;
40 
41 // @TODO These could be moved to crm_cluster_t* at that time as well
42 static bool cpg_evicted = false;
43 static GList *cs_message_queue = NULL;
44 static int cs_message_timer = 0;
45 
46 struct pcmk__cpg_host_s {
47  uint32_t id;
48  uint32_t pid;
49  gboolean local;
51  uint32_t size;
52  char uname[MAX_NAME];
53 } __attribute__ ((packed));
54 
55 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
56 
57 struct pcmk__cpg_msg_s {
58  struct qb_ipc_response_header header __attribute__ ((aligned(8)));
59  uint32_t id;
60  gboolean is_compressed;
61 
64 
65  uint32_t size;
66  uint32_t compressed_size;
67  /* 584 bytes */
68  char data[0];
69 
70 } __attribute__ ((packed));
71 
72 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
73 
74 static void crm_cs_flush(gpointer data);
75 
76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
77 
78 #define cs_repeat(rc, counter, max, code) do { \
79  rc = code; \
80  if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
81  counter++; \
82  crm_debug("Retrying operation after %ds", counter); \
83  sleep(counter); \
84  } else { \
85  break; \
86  } \
87  } while (counter < max)
88 
94 void
96 {
97  pcmk_cpg_handle = 0;
98  if (cluster->cpg_handle) {
99  crm_trace("Disconnecting CPG");
100  cpg_leave(cluster->cpg_handle, &cluster->group);
101  cpg_finalize(cluster->cpg_handle);
102  cluster->cpg_handle = 0;
103 
104  } else {
105  crm_info("No CPG connection");
106  }
107 }
108 
116 uint32_t
117 get_local_nodeid(cpg_handle_t handle)
118 {
119  cs_error_t rc = CS_OK;
120  int retries = 0;
121  static uint32_t local_nodeid = 0;
122  cpg_handle_t local_handle = handle;
123  cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
124  int fd = -1;
125  uid_t found_uid = 0;
126  gid_t found_gid = 0;
127  pid_t found_pid = 0;
128  int rv;
129 
130  if(local_nodeid != 0) {
131  return local_nodeid;
132  }
133 
134  if(handle == 0) {
135  crm_trace("Creating connection");
136  cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
137  if (rc != CS_OK) {
138  crm_err("Could not connect to the CPG API: %s (%d)",
139  cs_strerror(rc), rc);
140  return 0;
141  }
142 
143  rc = cpg_fd_get(local_handle, &fd);
144  if (rc != CS_OK) {
145  crm_err("Could not obtain the CPG API connection: %s (%d)",
146  cs_strerror(rc), rc);
147  goto bail;
148  }
149 
150  /* CPG provider run as root (in given user namespace, anyway)? */
151  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
152  &found_uid, &found_gid))) {
153  crm_err("CPG provider is not authentic:"
154  " process %lld (uid: %lld, gid: %lld)",
155  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
156  (long long) found_uid, (long long) found_gid);
157  goto bail;
158  } else if (rv < 0) {
159  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
160  strerror(-rv), -rv);
161  goto bail;
162  }
163  }
164 
165  if (rc == CS_OK) {
166  retries = 0;
167  crm_trace("Performing lookup");
168  cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
169  }
170 
171  if (rc != CS_OK) {
172  crm_err("Could not get local node id from the CPG API: %s (%d)",
173  pcmk__cs_err_str(rc), rc);
174  }
175 
176 bail:
177  if(handle == 0) {
178  crm_trace("Closing connection");
179  cpg_finalize(local_handle);
180  }
181  crm_debug("Local nodeid is %u", local_nodeid);
182  return local_nodeid;
183 }
184 
193 static gboolean
194 crm_cs_flush_cb(gpointer data)
195 {
196  cs_message_timer = 0;
197  crm_cs_flush(data);
198  return FALSE;
199 }
200 
201 // Send no more than this many CPG messages in one flush
202 #define CS_SEND_MAX 200
203 
210 static void
211 crm_cs_flush(gpointer data)
212 {
213  unsigned int sent = 0;
214  guint queue_len = 0;
215  cs_error_t rc = 0;
216  cpg_handle_t *handle = (cpg_handle_t *) data;
217 
218  if (*handle == 0) {
219  crm_trace("Connection is dead");
220  return;
221  }
222 
223  queue_len = g_list_length(cs_message_queue);
224  if (((queue_len % 1000) == 0) && (queue_len > 1)) {
225  crm_err("CPG queue has grown to %d", queue_len);
226 
227  } else if (queue_len == CS_SEND_MAX) {
228  crm_warn("CPG queue has grown to %d", queue_len);
229  }
230 
231  if (cs_message_timer != 0) {
232  /* There is already a timer, wait until it goes off */
233  crm_trace("Timer active %d", cs_message_timer);
234  return;
235  }
236 
237  while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
238  struct iovec *iov = cs_message_queue->data;
239 
240  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
241  if (rc != CS_OK) {
242  break;
243  }
244 
245  sent++;
246  crm_trace("CPG message sent, size=%llu",
247  (unsigned long long) iov->iov_len);
248 
249  cs_message_queue = g_list_remove(cs_message_queue, iov);
250  free(iov->iov_base);
251  free(iov);
252  }
253 
254  queue_len -= sent;
255  do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
256  "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
257  sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
258  (int) rc);
259 
260  if (cs_message_queue) {
261  uint32_t delay_ms = 100;
262  if (rc != CS_OK) {
263  /* Proportionally more if sending failed but cap at 1s */
264  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
265  }
266  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
267  }
268 }
269 
278 static int
279 pcmk_cpg_dispatch(gpointer user_data)
280 {
281  cs_error_t rc = CS_OK;
282  crm_cluster_t *cluster = (crm_cluster_t *) user_data;
283 
284  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
285  if (rc != CS_OK) {
286  crm_err("Connection to the CPG API failed: %s (%d)",
287  pcmk__cs_err_str(rc), rc);
288  cpg_finalize(cluster->cpg_handle);
289  cluster->cpg_handle = 0;
290  return -1;
291 
292  } else if (cpg_evicted) {
293  crm_err("Evicted from CPG membership");
294  return -1;
295  }
296  return 0;
297 }
298 
299 static inline const char *
300 ais_dest(const pcmk__cpg_host_t *host)
301 {
302  if (host->local) {
303  return "local";
304  } else if (host->size > 0) {
305  return host->uname;
306  } else {
307  return "<all>";
308  }
309 }
310 
311 static inline const char *
312 msg_type2text(enum crm_ais_msg_types type)
313 {
314  const char *text = "unknown";
315 
316  switch (type) {
317  case crm_msg_none:
318  text = "unknown";
319  break;
320  case crm_msg_ais:
321  text = "ais";
322  break;
323  case crm_msg_cib:
324  text = "cib";
325  break;
326  case crm_msg_crmd:
327  text = "crmd";
328  break;
329  case crm_msg_pe:
330  text = "pengine";
331  break;
332  case crm_msg_te:
333  text = "tengine";
334  break;
335  case crm_msg_lrmd:
336  text = "lrmd";
337  break;
338  case crm_msg_attrd:
339  text = "attrd";
340  break;
341  case crm_msg_stonithd:
342  text = "stonithd";
343  break;
344  case crm_msg_stonith_ng:
345  text = "stonith-ng";
346  break;
347  }
348  return text;
349 }
350 
359 static bool
360 check_message_sanity(const pcmk__cpg_msg_t *msg)
361 {
362  int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
363 
364  if (payload_size < 1) {
365  crm_err("%sCPG message %d from %s invalid: "
366  "Claimed size of %d bytes is too small "
367  CRM_XS " from %s[%u] to %s@%s",
368  (msg->is_compressed? "Compressed " : ""),
369  msg->id, ais_dest(&(msg->sender)),
370  (int) msg->header.size,
371  msg_type2text(msg->sender.type), msg->sender.pid,
372  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
373  return false;
374  }
375 
376  if (msg->header.error != CS_OK) {
377  crm_err("%sCPG message %d from %s invalid: "
378  "Sender indicated error %d "
379  CRM_XS " from %s[%u] to %s@%s",
380  (msg->is_compressed? "Compressed " : ""),
381  msg->id, ais_dest(&(msg->sender)),
382  msg->header.error,
383  msg_type2text(msg->sender.type), msg->sender.pid,
384  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
385  return false;
386  }
387 
388  if (msg_data_len(msg) != payload_size) {
389  crm_err("%sCPG message %d from %s invalid: "
390  "Total size %d inconsistent with payload size %d "
391  CRM_XS " from %s[%u] to %s@%s",
392  (msg->is_compressed? "Compressed " : ""),
393  msg->id, ais_dest(&(msg->sender)),
394  (int) msg->header.size, (int) msg_data_len(msg),
395  msg_type2text(msg->sender.type), msg->sender.pid,
396  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
397  return false;
398  }
399 
400  if (!msg->is_compressed &&
401  /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
402  * but checking the last byte or two should be quick
403  */
404  (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
405  || (msg->data[msg->size - 1] != '\0'))) {
406  crm_err("CPG message %d from %s invalid: "
407  "Payload does not end at byte %llu "
408  CRM_XS " from %s[%u] to %s@%s",
409  msg->id, ais_dest(&(msg->sender)),
410  (unsigned long long) msg->size,
411  msg_type2text(msg->sender.type), msg->sender.pid,
412  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
413  return false;
414  }
415 
416  crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
417  (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
418  msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
419  ais_dest(&(msg->sender)),
420  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
421  return true;
422 }
423 
440 char *
441 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
442  uint32_t *kind, const char **from)
443 {
444  char *data = NULL;
445  pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
446 
447  if(handle) {
448  // Do filtering and field massaging
449  uint32_t local_nodeid = get_local_nodeid(handle);
450  const char *local_name = get_local_node_name();
451 
452  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
453  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
454  return NULL;
455 
456  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
457  /* Not for us */
458  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
459  return NULL;
460  } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
461  /* Not for us */
462  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
463  return NULL;
464  }
465 
466  msg->sender.id = nodeid;
467  if (msg->sender.size == 0) {
468  crm_node_t *peer = crm_get_peer(nodeid, NULL);
469 
470  if (peer == NULL) {
471  crm_err("Peer with nodeid=%u is unknown", nodeid);
472 
473  } else if (peer->uname == NULL) {
474  crm_err("No uname for peer with nodeid=%u", nodeid);
475 
476  } else {
477  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
478  msg->sender.size = strlen(peer->uname);
479  memset(msg->sender.uname, 0, MAX_NAME);
480  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
481  }
482  }
483  }
484 
485  crm_trace("Got new%s message (size=%d, %d, %d)",
486  msg->is_compressed ? " compressed" : "",
487  msg_data_len(msg), msg->size, msg->compressed_size);
488 
489  if (kind != NULL) {
490  *kind = msg->header.id;
491  }
492  if (from != NULL) {
493  *from = msg->sender.uname;
494  }
495 
496  if (msg->is_compressed && msg->size > 0) {
497  int rc = BZ_OK;
498  char *uncompressed = NULL;
499  unsigned int new_size = msg->size + 1;
500 
501  if (!check_message_sanity(msg)) {
502  goto badmsg;
503  }
504 
505  crm_trace("Decompressing message data");
506  uncompressed = calloc(1, new_size);
507  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
508 
509  if (rc != BZ_OK) {
510  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
511  bz2_strerror(rc), rc);
512  free(uncompressed);
513  goto badmsg;
514  }
515 
516  CRM_ASSERT(rc == BZ_OK);
517  CRM_ASSERT(new_size == msg->size);
518 
519  data = uncompressed;
520 
521  } else if (!check_message_sanity(msg)) {
522  goto badmsg;
523 
524  } else {
525  data = strdup(msg->data);
526  }
527 
528  // Is this necessary?
529  crm_get_peer(msg->sender.id, msg->sender.uname);
530 
531  crm_trace("Payload: %.200s", data);
532  return data;
533 
534  badmsg:
535  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
536  " min=%d, total=%d, size=%d, bz2_size=%d",
537  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
538  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
539  msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
540  msg->header.size, msg->size, msg->compressed_size);
541 
542  free(data);
543  return NULL;
544 }
545 
557 static int
558 cmp_member_list_nodeid(const void *first, const void *second)
559 {
560  const struct cpg_address *const a = *((const struct cpg_address **) first),
561  *const b = *((const struct cpg_address **) second);
562  if (a->nodeid < b->nodeid) {
563  return -1;
564  } else if (a->nodeid > b->nodeid) {
565  return 1;
566  }
567  /* don't bother with "reason" nor "pid" */
568  return 0;
569 }
570 
579 static const char *
580 cpgreason2str(cpg_reason_t reason)
581 {
582  switch (reason) {
583  case CPG_REASON_JOIN: return " via cpg_join";
584  case CPG_REASON_LEAVE: return " via cpg_leave";
585  case CPG_REASON_NODEDOWN: return " via cluster exit";
586  case CPG_REASON_NODEUP: return " via cluster join";
587  case CPG_REASON_PROCDOWN: return " for unknown reason";
588  default: break;
589  }
590  return "";
591 }
592 
601 static inline const char *
602 peer_name(crm_node_t *peer)
603 {
604  if (peer == NULL) {
605  return "unknown node";
606  } else if (peer->uname == NULL) {
607  return "peer node";
608  } else {
609  return peer->uname;
610  }
611 }
612 
624 static void
625 node_left(const char *cpg_group_name, int event_counter,
626  uint32_t local_nodeid, const struct cpg_address *cpg_peer,
627  const struct cpg_address **sorted_member_list,
628  size_t member_list_entries)
629 {
630  crm_node_t *peer = pcmk__search_cluster_node_cache(cpg_peer->nodeid,
631  NULL);
632  const struct cpg_address **rival = NULL;
633 
634  /* Most CPG-related Pacemaker code assumes that only one process on a node
635  * can be in the process group, but Corosync does not impose this
636  * limitation, and more than one can be a member in practice due to a
637  * daemon attempting to start while another instance is already running.
638  *
639  * Check for any such duplicate instances, because we don't want to process
640  * their leaving as if our actual peer left. If the peer that left still has
641  * an entry in sorted_member_list (with a different PID), we will ignore the
642  * leaving.
643  *
644  * @TODO Track CPG members' PIDs so we can tell exactly who left.
645  */
646  if (peer != NULL) {
647  rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
648  sizeof(const struct cpg_address *),
649  cmp_member_list_nodeid);
650  }
651 
652  if (rival == NULL) {
653  crm_info("Group %s event %d: %s (node %u pid %u) left%s",
654  cpg_group_name, event_counter, peer_name(peer),
655  cpg_peer->nodeid, cpg_peer->pid,
656  cpgreason2str(cpg_peer->reason));
657  if (peer != NULL) {
658  crm_update_peer_proc(__func__, peer, crm_proc_cpg,
659  OFFLINESTATUS);
660  }
661  } else if (cpg_peer->nodeid == local_nodeid) {
662  crm_warn("Group %s event %d: duplicate local pid %u left%s",
663  cpg_group_name, event_counter,
664  cpg_peer->pid, cpgreason2str(cpg_peer->reason));
665  } else {
666  crm_warn("Group %s event %d: "
667  "%s (node %u) duplicate pid %u left%s (%u remains)",
668  cpg_group_name, event_counter, peer_name(peer),
669  cpg_peer->nodeid, cpg_peer->pid,
670  cpgreason2str(cpg_peer->reason), (*rival)->pid);
671  }
672 }
673 
686 void
687 pcmk_cpg_membership(cpg_handle_t handle,
688  const struct cpg_name *groupName,
689  const struct cpg_address *member_list, size_t member_list_entries,
690  const struct cpg_address *left_list, size_t left_list_entries,
691  const struct cpg_address *joined_list, size_t joined_list_entries)
692 {
693  int i;
694  gboolean found = FALSE;
695  static int counter = 0;
696  uint32_t local_nodeid = get_local_nodeid(handle);
697  const struct cpg_address **sorted;
698 
699  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
700  CRM_ASSERT(sorted != NULL);
701 
702  for (size_t iter = 0; iter < member_list_entries; iter++) {
703  sorted[iter] = member_list + iter;
704  }
705  /* so that the cross-matching multiply-subscribed nodes is then cheap */
706  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
707  cmp_member_list_nodeid);
708 
709  for (i = 0; i < left_list_entries; i++) {
710  node_left(groupName->value, counter, local_nodeid, &left_list[i],
711  sorted, member_list_entries);
712  }
713  free(sorted);
714  sorted = NULL;
715 
716  for (i = 0; i < joined_list_entries; i++) {
717  crm_info("Group %s event %d: node %u pid %u joined%s",
718  groupName->value, counter, joined_list[i].nodeid,
719  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
720  }
721 
722  for (i = 0; i < member_list_entries; i++) {
723  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
724 
725  if (member_list[i].nodeid == local_nodeid
726  && member_list[i].pid != getpid()) {
727  // See the note in node_left()
728  crm_warn("Group %s event %d: detected duplicate local pid %u",
729  groupName->value, counter, member_list[i].pid);
730  continue;
731  }
732  crm_info("Group %s event %d: %s (node %u pid %u) is member",
733  groupName->value, counter, peer_name(peer),
734  member_list[i].nodeid, member_list[i].pid);
735 
736  /* If the caller left auto-reaping enabled, this will also update the
737  * state to member.
738  */
739  peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
740  ONLINESTATUS);
741 
742  if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
743  /* The node is a CPG member, but we currently think it's not a
744  * cluster member. This is possible only if auto-reaping was
745  * disabled. The node may be joining, and we happened to get the CPG
746  * notification before the quorum notification; or the node may have
747  * just died, and we are processing its final messages; or a bug
748  * has affected the peer cache.
749  */
750  time_t now = time(NULL);
751 
752  if (peer->when_lost == 0) {
753  // Track when we first got into this contradictory state
754  peer->when_lost = now;
755 
756  } else if (now > (peer->when_lost + 60)) {
757  // If it persists for more than a minute, update the state
758  crm_warn("Node %u is member of group %s but was believed offline",
759  member_list[i].nodeid, groupName->value);
760  pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
761  }
762  }
763 
764  if (local_nodeid == member_list[i].nodeid) {
765  found = TRUE;
766  }
767  }
768 
769  if (!found) {
770  crm_err("Local node was evicted from group %s", groupName->value);
771  cpg_evicted = true;
772  }
773 
774  counter++;
775 }
776 
784 gboolean
786 {
787  cs_error_t rc;
788  int fd = -1;
789  int retries = 0;
790  uint32_t id = 0;
791  crm_node_t *peer = NULL;
792  cpg_handle_t handle = 0;
793  const char *message_name = pcmk__message_name(crm_system_name);
794  uid_t found_uid = 0;
795  gid_t found_gid = 0;
796  pid_t found_pid = 0;
797  int rv;
798 
799  struct mainloop_fd_callbacks cpg_fd_callbacks = {
800  .dispatch = pcmk_cpg_dispatch,
801  .destroy = cluster->destroy,
802  };
803 
804  cpg_model_v1_data_t cpg_model_info = {
805  .model = CPG_MODEL_V1,
806  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
807  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
808  .cpg_totem_confchg_fn = NULL,
809  .flags = 0,
810  };
811 
812  cpg_evicted = false;
813  cluster->group.length = 0;
814  cluster->group.value[0] = 0;
815 
816  /* group.value is char[128] */
817  strncpy(cluster->group.value, message_name, 127);
818  cluster->group.value[127] = 0;
819  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
820 
821  cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
822  if (rc != CS_OK) {
823  crm_err("Could not connect to the CPG API: %s (%d)",
824  cs_strerror(rc), rc);
825  goto bail;
826  }
827 
828  rc = cpg_fd_get(handle, &fd);
829  if (rc != CS_OK) {
830  crm_err("Could not obtain the CPG API connection: %s (%d)",
831  cs_strerror(rc), rc);
832  goto bail;
833  }
834 
835  /* CPG provider run as root (in given user namespace, anyway)? */
836  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
837  &found_uid, &found_gid))) {
838  crm_err("CPG provider is not authentic:"
839  " process %lld (uid: %lld, gid: %lld)",
840  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
841  (long long) found_uid, (long long) found_gid);
842  rc = CS_ERR_ACCESS;
843  goto bail;
844  } else if (rv < 0) {
845  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
846  strerror(-rv), -rv);
847  rc = CS_ERR_ACCESS;
848  goto bail;
849  }
850 
851  id = get_local_nodeid(handle);
852  if (id == 0) {
853  crm_err("Could not get local node id from the CPG API");
854  goto bail;
855 
856  }
857  cluster->nodeid = id;
858 
859  retries = 0;
860  cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
861  if (rc != CS_OK) {
862  crm_err("Could not join the CPG group '%s': %d", message_name, rc);
863  goto bail;
864  }
865 
866  pcmk_cpg_handle = handle;
867  cluster->cpg_handle = handle;
868  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
869 
870  bail:
871  if (rc != CS_OK) {
872  cpg_finalize(handle);
873  return FALSE;
874  }
875 
876  peer = crm_get_peer(id, NULL);
878  return TRUE;
879 }
880 
891 gboolean
892 pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
893 {
894  gboolean rc = TRUE;
895  char *data = NULL;
896 
897  data = dump_xml_unformatted(msg);
898  rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
899  free(data);
900  return rc;
901 }
902 
915 gboolean
916 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
917  gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
918 {
919  static int msg_id = 0;
920  static int local_pid = 0;
921  static int local_name_len = 0;
922  static const char *local_name = NULL;
923 
924  char *target = NULL;
925  struct iovec *iov;
926  pcmk__cpg_msg_t *msg = NULL;
928 
929  switch (msg_class) {
930  case crm_class_cluster:
931  break;
932  default:
933  crm_err("Invalid message class: %d", msg_class);
934  return FALSE;
935  }
936 
937  CRM_CHECK(dest != crm_msg_ais, return FALSE);
938 
939  if (local_name == NULL) {
940  local_name = get_local_node_name();
941  }
942  if ((local_name_len == 0) && (local_name != NULL)) {
943  local_name_len = strlen(local_name);
944  }
945 
946  if (data == NULL) {
947  data = "";
948  }
949 
950  if (local_pid == 0) {
951  local_pid = getpid();
952  }
953 
954  if (sender == crm_msg_none) {
955  sender = local_pid;
956  }
957 
958  msg = calloc(1, sizeof(pcmk__cpg_msg_t));
959 
960  msg_id++;
961  msg->id = msg_id;
962  msg->header.id = msg_class;
963  msg->header.error = CS_OK;
964 
965  msg->host.type = dest;
966  msg->host.local = local;
967 
968  if (node) {
969  if (node->uname) {
970  target = strdup(node->uname);
971  msg->host.size = strlen(node->uname);
972  memset(msg->host.uname, 0, MAX_NAME);
973  memcpy(msg->host.uname, node->uname, msg->host.size);
974  } else {
975  target = crm_strdup_printf("%u", node->id);
976  }
977  msg->host.id = node->id;
978  } else {
979  target = strdup("all");
980  }
981 
982  msg->sender.id = 0;
983  msg->sender.type = sender;
984  msg->sender.pid = local_pid;
985  msg->sender.size = local_name_len;
986  memset(msg->sender.uname, 0, MAX_NAME);
987  if ((local_name != NULL) && (msg->sender.size != 0)) {
988  memcpy(msg->sender.uname, local_name, msg->sender.size);
989  }
990 
991  msg->size = 1 + strlen(data);
992  msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
993 
994  if (msg->size < CRM_BZ2_THRESHOLD) {
995  msg = pcmk__realloc(msg, msg->header.size);
996  memcpy(msg->data, data, msg->size);
997 
998  } else {
999  char *compressed = NULL;
1000  unsigned int new_size = 0;
1001  char *uncompressed = strdup(data);
1002 
1003  if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
1004  &compressed, &new_size) == pcmk_rc_ok) {
1005 
1006  msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
1007  msg = pcmk__realloc(msg, msg->header.size);
1008  memcpy(msg->data, compressed, new_size);
1009 
1010  msg->is_compressed = TRUE;
1011  msg->compressed_size = new_size;
1012 
1013  } else {
1014  // cppcheck seems not to understand the abort logic in pcmk__realloc
1015  // cppcheck-suppress memleak
1016  msg = pcmk__realloc(msg, msg->header.size);
1017  memcpy(msg->data, data, msg->size);
1018  }
1019 
1020  free(uncompressed);
1021  free(compressed);
1022  }
1023 
1024  iov = calloc(1, sizeof(struct iovec));
1025  iov->iov_base = msg;
1026  iov->iov_len = msg->header.size;
1027 
1028  if (msg->compressed_size) {
1029  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1030  msg->id, target, (unsigned long long) iov->iov_len,
1031  msg->compressed_size, data);
1032  } else {
1033  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1034  msg->id, target, (unsigned long long) iov->iov_len,
1035  msg->size, data);
1036  }
1037  free(target);
1038 
1039  cs_message_queue = g_list_append(cs_message_queue, iov);
1040  crm_cs_flush(&pcmk_cpg_handle);
1041 
1042  return TRUE;
1043 }
1044 
1052 enum crm_ais_msg_types
1053 text2msg_type(const char *text)
1054 {
1055  int type = crm_msg_none;
1056 
1057  CRM_CHECK(text != NULL, return type);
1058  text = pcmk__message_name(text);
1059  if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1060  type = crm_msg_ais;
1061  } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1062  type = crm_msg_cib;
1063  } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1064  type = crm_msg_crmd;
1065  } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1066  type = crm_msg_te;
1067  } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1068  type = crm_msg_pe;
1069  } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1070  type = crm_msg_lrmd;
1071  } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1073  } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1075  } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1076  type = crm_msg_attrd;
1077 
1078  } else {
1079  /* This will normally be a transient client rather than
1080  * a cluster daemon. Set the type to the pid of the client
1081  */
1082  int scan_rc = sscanf(text, "%d", &type);
1083 
1084  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1085  /* Ensure it's sane */
1086  type = crm_msg_none;
1087  }
1088  }
1089  return type;
1090 }
#define LOG_TRACE
Definition: logging.h:36
pcmk__cpg_host_t host
Definition: cpg.c:49
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:225
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: messages.c:182
#define cs_repeat(rc, counter, max, code)
Definition: cpg.c:78
uint32_t size
Definition: cpg.c:49
#define crm_notice(fmt, args...)
Definition: logging.h:359
const char * bz2_strerror(int rc)
Definition: results.c:776
char data[0]
Definition: cpg.c:55
crm_ais_msg_types
Definition: cluster.h:103
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:975
uint32_t nodeid
Definition: cluster.h:80
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
Definition: strings.c:931
uint32_t id
Definition: cluster.h:66
pcmk__cpg_host_t sender
Definition: cpg.c:50
const char * get_local_node_name(void)
Get the local node&#39;s name.
Definition: cluster.c:155
void(* destroy)(gpointer)
Definition: cluster.h:82
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:53
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
Definition: cpg.c:72
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Get a cluster node cache entry.
Definition: membership.c:702
char * crm_system_name
Definition: utils.c:54
#define CS_SEND_MAX
Definition: cpg.c:202
enum crm_ais_msg_types type
Definition: cpg.c:48
char * strerror(int errnum)
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:916
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Extract text data from a Corosync CPG message.
Definition: cpg.c:441
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:102
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Disconnect from Corosync CPG.
Definition: cpg.c:95
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
Definition: mainloop.h:137
struct pcmk__cpg_host_s pcmk__cpg_host_t
Definition: cpg.c:55
gboolean local
Definition: cpg.c:47
#define crm_warn(fmt, args...)
Definition: logging.h:358
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node&#39;s state and membership information.
Definition: membership.c:1079
int rc
Definition: pcmk_fence.c:35
uint32_t pid
Definition: cpg.c:46
#define crm_debug(fmt, args...)
Definition: logging.h:362
time_t when_lost
Definition: cluster.h:67
#define crm_trace(fmt, args...)
Definition: logging.h:363
crm_node_t * pcmk__search_cluster_node_cache(unsigned int id, const char *uname)
Definition: membership.c:562
#define do_crm_log(level, fmt, args...)
Log a message.
Definition: logging.h:166
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:876
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define CRM_SYSTEM_PENGINE
Definition: crm.h:108
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition: strings.c:749
#define CRM_NODE_MEMBER
Definition: cluster.h:33
gboolean pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:892
uint32_t id
Definition: cpg.c:45
struct pcmk__cpg_host_s __attribute__((packed))
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
Definition: crm.h:76
uint32_t compressed_size
Definition: cpg.c:53
#define CRM_SYSTEM_CRMD
Definition: crm.h:106
const char * target
Definition: pcmk_fence.c:29
crm_ais_msg_class
Definition: cluster.h:99
#define CRM_XS
Definition: logging.h:54
#define CRM_SYSTEM_STONITHD
Definition: crm.h:110
#define CRM_SYSTEM_CIB
Definition: crm.h:105
#define CRM_SYSTEM_TENGINE
Definition: crm.h:109
gboolean is_compressed
Definition: cpg.c:47
uint32_t get_local_nodeid(cpg_handle_t handle)
Get the local Corosync node ID (via CPG)
Definition: cpg.c:117
#define crm_err(fmt, args...)
Definition: logging.h:357
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:181
#define CRM_ASSERT(expr)
Definition: results.h:42
#define OFFLINESTATUS
Definition: util.h:39
enum crm_ais_msg_types text2msg_type(const char *text)
Get the message type equivalent of a string.
Definition: cpg.c:1053
#define CRM_BZ2_THRESHOLD
Definition: xml.h:47
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:2013
#define CRM_SYSTEM_LRMD
Definition: crm.h:107
char uname[MAX_NAME]
Definition: cpg.c:50
char * state
Definition: cluster.h:55
#define msg_data_len(msg)
Definition: cpg.c:76
#define pcmk__plural_s(i)
IPC interface to Pacemaker daemons.
char * uname
Definition: cluster.h:53
#define ONLINESTATUS
Definition: util.h:38
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Handle a CPG configuration change event.
Definition: cpg.c:687
#define crm_info(fmt, args...)
Definition: logging.h:360
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Connect to Corosync CPG.
Definition: cpg.c:785
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
Definition: ipc_client.c:1382