pacemaker  2.1.0-7c3f660
Scalable High-Availability cluster resource manager
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2020 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipc_common.h>
23 #include <qb/qbipcc.h>
24 #include <qb/qbutil.h>
25 
26 #include <corosync/corodefs.h>
27 #include <corosync/corotypes.h>
28 #include <corosync/hdb.h>
29 #include <corosync/cpg.h>
30 
31 #include <crm/msg_xml.h>
32 
33 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
34 #include "crmcluster_private.h"
35 
36 /* @TODO Once we can update the public API to require crm_cluster_t* in more
37  * functions, we can ditch this in favor of cluster->cpg_handle.
38  */
39 static cpg_handle_t pcmk_cpg_handle = 0;
40 
41 // @TODO These could be moved to crm_cluster_t* at that time as well
42 static bool cpg_evicted = false;
43 static GList *cs_message_queue = NULL;
44 static int cs_message_timer = 0;
45 
46 struct pcmk__cpg_host_s {
47  uint32_t id;
48  uint32_t pid;
49  gboolean local;
51  uint32_t size;
52  char uname[MAX_NAME];
53 } __attribute__ ((packed));
54 
55 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
56 
57 struct pcmk__cpg_msg_s {
58  struct qb_ipc_response_header header __attribute__ ((aligned(8)));
59  uint32_t id;
60  gboolean is_compressed;
61 
64 
65  uint32_t size;
66  uint32_t compressed_size;
67  /* 584 bytes */
68  char data[0];
69 
70 } __attribute__ ((packed));
71 
72 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
73 
74 static void crm_cs_flush(gpointer data);
75 
76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
77 
78 #define cs_repeat(rc, counter, max, code) do { \
79  rc = code; \
80  if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
81  counter++; \
82  crm_debug("Retrying operation after %ds", counter); \
83  sleep(counter); \
84  } else { \
85  break; \
86  } \
87  } while (counter < max)
88 
94 void
96 {
97  pcmk_cpg_handle = 0;
98  if (cluster->cpg_handle) {
99  crm_trace("Disconnecting CPG");
100  cpg_leave(cluster->cpg_handle, &cluster->group);
101  cpg_finalize(cluster->cpg_handle);
102  cluster->cpg_handle = 0;
103 
104  } else {
105  crm_info("No CPG connection");
106  }
107 }
108 
116 uint32_t
117 get_local_nodeid(cpg_handle_t handle)
118 {
119  cs_error_t rc = CS_OK;
120  int retries = 0;
121  static uint32_t local_nodeid = 0;
122  cpg_handle_t local_handle = handle;
123  cpg_callbacks_t cb = { };
124  int fd = -1;
125  uid_t found_uid = 0;
126  gid_t found_gid = 0;
127  pid_t found_pid = 0;
128  int rv;
129 
130  if(local_nodeid != 0) {
131  return local_nodeid;
132  }
133 
134  if(handle == 0) {
135  crm_trace("Creating connection");
136  cs_repeat(rc, retries, 5, cpg_initialize(&local_handle, &cb));
137  if (rc != CS_OK) {
138  crm_err("Could not connect to the CPG API: %s (%d)",
139  cs_strerror(rc), rc);
140  return 0;
141  }
142 
143  rc = cpg_fd_get(local_handle, &fd);
144  if (rc != CS_OK) {
145  crm_err("Could not obtain the CPG API connection: %s (%d)",
146  cs_strerror(rc), rc);
147  goto bail;
148  }
149 
150  /* CPG provider run as root (in given user namespace, anyway)? */
151  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
152  &found_uid, &found_gid))) {
153  crm_err("CPG provider is not authentic:"
154  " process %lld (uid: %lld, gid: %lld)",
155  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
156  (long long) found_uid, (long long) found_gid);
157  goto bail;
158  } else if (rv < 0) {
159  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
160  strerror(-rv), -rv);
161  goto bail;
162  }
163  }
164 
165  if (rc == CS_OK) {
166  retries = 0;
167  crm_trace("Performing lookup");
168  cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
169  }
170 
171  if (rc != CS_OK) {
172  crm_err("Could not get local node id from the CPG API: %s (%d)",
173  pcmk__cs_err_str(rc), rc);
174  }
175 
176 bail:
177  if(handle == 0) {
178  crm_trace("Closing connection");
179  cpg_finalize(local_handle);
180  }
181  crm_debug("Local nodeid is %u", local_nodeid);
182  return local_nodeid;
183 }
184 
193 static gboolean
194 crm_cs_flush_cb(gpointer data)
195 {
196  cs_message_timer = 0;
197  crm_cs_flush(data);
198  return FALSE;
199 }
200 
201 // Send no more than this many CPG messages in one flush
202 #define CS_SEND_MAX 200
203 
210 static void
211 crm_cs_flush(gpointer data)
212 {
213  unsigned int sent = 0;
214  guint queue_len = 0;
215  cs_error_t rc = 0;
216  cpg_handle_t *handle = (cpg_handle_t *) data;
217 
218  if (*handle == 0) {
219  crm_trace("Connection is dead");
220  return;
221  }
222 
223  queue_len = g_list_length(cs_message_queue);
224  if (((queue_len % 1000) == 0) && (queue_len > 1)) {
225  crm_err("CPG queue has grown to %d", queue_len);
226 
227  } else if (queue_len == CS_SEND_MAX) {
228  crm_warn("CPG queue has grown to %d", queue_len);
229  }
230 
231  if (cs_message_timer != 0) {
232  /* There is already a timer, wait until it goes off */
233  crm_trace("Timer active %d", cs_message_timer);
234  return;
235  }
236 
237  while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
238  struct iovec *iov = cs_message_queue->data;
239 
240  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
241  if (rc != CS_OK) {
242  break;
243  }
244 
245  sent++;
246  crm_trace("CPG message sent, size=%llu",
247  (unsigned long long) iov->iov_len);
248 
249  cs_message_queue = g_list_remove(cs_message_queue, iov);
250  free(iov->iov_base);
251  free(iov);
252  }
253 
254  queue_len -= sent;
255  if ((sent > 1) || (cs_message_queue != NULL)) {
256  crm_info("Sent %u CPG messages (%d remaining): %s (%d)",
257  sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
258  } else {
259  crm_trace("Sent %u CPG messages (%d remaining): %s (%d)",
260  sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
261  }
262 
263  if (cs_message_queue) {
264  uint32_t delay_ms = 100;
265  if (rc != CS_OK) {
266  /* Proportionally more if sending failed but cap at 1s */
267  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
268  }
269  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
270  }
271 }
272 
281 static int
282 pcmk_cpg_dispatch(gpointer user_data)
283 {
284  cs_error_t rc = CS_OK;
285  crm_cluster_t *cluster = (crm_cluster_t *) user_data;
286 
287  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
288  if (rc != CS_OK) {
289  crm_err("Connection to the CPG API failed: %s (%d)",
290  pcmk__cs_err_str(rc), rc);
291  cpg_finalize(cluster->cpg_handle);
292  cluster->cpg_handle = 0;
293  return -1;
294 
295  } else if (cpg_evicted) {
296  crm_err("Evicted from CPG membership");
297  return -1;
298  }
299  return 0;
300 }
301 
302 static inline const char *
303 ais_dest(const pcmk__cpg_host_t *host)
304 {
305  if (host->local) {
306  return "local";
307  } else if (host->size > 0) {
308  return host->uname;
309  } else {
310  return "<all>";
311  }
312 }
313 
314 static inline const char *
315 msg_type2text(enum crm_ais_msg_types type)
316 {
317  const char *text = "unknown";
318 
319  switch (type) {
320  case crm_msg_none:
321  text = "unknown";
322  break;
323  case crm_msg_ais:
324  text = "ais";
325  break;
326  case crm_msg_cib:
327  text = "cib";
328  break;
329  case crm_msg_crmd:
330  text = "crmd";
331  break;
332  case crm_msg_pe:
333  text = "pengine";
334  break;
335  case crm_msg_te:
336  text = "tengine";
337  break;
338  case crm_msg_lrmd:
339  text = "lrmd";
340  break;
341  case crm_msg_attrd:
342  text = "attrd";
343  break;
344  case crm_msg_stonithd:
345  text = "stonithd";
346  break;
347  case crm_msg_stonith_ng:
348  text = "stonith-ng";
349  break;
350  }
351  return text;
352 }
353 
362 static bool
363 check_message_sanity(const pcmk__cpg_msg_t *msg)
364 {
365  int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
366 
367  if (payload_size < 1) {
368  crm_err("%sCPG message %d from %s invalid: "
369  "Claimed size of %d bytes is too small "
370  CRM_XS " from %s[%u] to %s@%s",
371  (msg->is_compressed? "Compressed " : ""),
372  msg->id, ais_dest(&(msg->sender)),
373  (int) msg->header.size,
374  msg_type2text(msg->sender.type), msg->sender.pid,
375  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
376  return false;
377  }
378 
379  if (msg->header.error != CS_OK) {
380  crm_err("%sCPG message %d from %s invalid: "
381  "Sender indicated error %d "
382  CRM_XS " from %s[%u] to %s@%s",
383  (msg->is_compressed? "Compressed " : ""),
384  msg->id, ais_dest(&(msg->sender)),
385  msg->header.error,
386  msg_type2text(msg->sender.type), msg->sender.pid,
387  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
388  return false;
389  }
390 
391  if (msg_data_len(msg) != payload_size) {
392  crm_err("%sCPG message %d from %s invalid: "
393  "Total size %d inconsistent with payload size %d "
394  CRM_XS " from %s[%u] to %s@%s",
395  (msg->is_compressed? "Compressed " : ""),
396  msg->id, ais_dest(&(msg->sender)),
397  (int) msg->header.size, (int) msg_data_len(msg),
398  msg_type2text(msg->sender.type), msg->sender.pid,
399  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
400  return false;
401  }
402 
403  if (!msg->is_compressed &&
404  /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
405  * but checking the last byte or two should be quick
406  */
407  (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
408  || (msg->data[msg->size - 1] != '\0'))) {
409  crm_err("CPG message %d from %s invalid: "
410  "Payload does not end at byte %llu "
411  CRM_XS " from %s[%u] to %s@%s",
412  msg->id, ais_dest(&(msg->sender)),
413  (unsigned long long) msg->size,
414  msg_type2text(msg->sender.type), msg->sender.pid,
415  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
416  return false;
417  }
418 
419  crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
420  (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
421  msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
422  ais_dest(&(msg->sender)),
423  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
424  return true;
425 }
426 
443 char *
444 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
445  uint32_t *kind, const char **from)
446 {
447  char *data = NULL;
448  pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
449 
450  if(handle) {
451  // Do filtering and field massaging
452  uint32_t local_nodeid = get_local_nodeid(handle);
453  const char *local_name = get_local_node_name();
454 
455  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
456  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
457  return NULL;
458 
459  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
460  /* Not for us */
461  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
462  return NULL;
463  } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
464  /* Not for us */
465  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
466  return NULL;
467  }
468 
469  msg->sender.id = nodeid;
470  if (msg->sender.size == 0) {
471  crm_node_t *peer = crm_get_peer(nodeid, NULL);
472 
473  if (peer == NULL) {
474  crm_err("Peer with nodeid=%u is unknown", nodeid);
475 
476  } else if (peer->uname == NULL) {
477  crm_err("No uname for peer with nodeid=%u", nodeid);
478 
479  } else {
480  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
481  msg->sender.size = strlen(peer->uname);
482  memset(msg->sender.uname, 0, MAX_NAME);
483  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
484  }
485  }
486  }
487 
488  crm_trace("Got new%s message (size=%d, %d, %d)",
489  msg->is_compressed ? " compressed" : "",
490  msg_data_len(msg), msg->size, msg->compressed_size);
491 
492  if (kind != NULL) {
493  *kind = msg->header.id;
494  }
495  if (from != NULL) {
496  *from = msg->sender.uname;
497  }
498 
499  if (msg->is_compressed && msg->size > 0) {
500  int rc = BZ_OK;
501  char *uncompressed = NULL;
502  unsigned int new_size = msg->size + 1;
503 
504  if (!check_message_sanity(msg)) {
505  goto badmsg;
506  }
507 
508  crm_trace("Decompressing message data");
509  uncompressed = calloc(1, new_size);
510  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
511 
512  if (rc != BZ_OK) {
513  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
514  bz2_strerror(rc), rc);
515  free(uncompressed);
516  goto badmsg;
517  }
518 
519  CRM_ASSERT(rc == BZ_OK);
520  CRM_ASSERT(new_size == msg->size);
521 
522  data = uncompressed;
523 
524  } else if (!check_message_sanity(msg)) {
525  goto badmsg;
526 
527  } else {
528  data = strdup(msg->data);
529  }
530 
531  // Is this necessary?
532  crm_get_peer(msg->sender.id, msg->sender.uname);
533 
534  crm_trace("Payload: %.200s", data);
535  return data;
536 
537  badmsg:
538  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
539  " min=%d, total=%d, size=%d, bz2_size=%d",
540  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
541  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
542  msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
543  msg->header.size, msg->size, msg->compressed_size);
544 
545  free(data);
546  return NULL;
547 }
548 
560 static int
561 cmp_member_list_nodeid(const void *first, const void *second)
562 {
563  const struct cpg_address *const a = *((const struct cpg_address **) first),
564  *const b = *((const struct cpg_address **) second);
565  if (a->nodeid < b->nodeid) {
566  return -1;
567  } else if (a->nodeid > b->nodeid) {
568  return 1;
569  }
570  /* don't bother with "reason" nor "pid" */
571  return 0;
572 }
573 
582 static const char *
583 cpgreason2str(cpg_reason_t reason)
584 {
585  switch (reason) {
586  case CPG_REASON_JOIN: return " via cpg_join";
587  case CPG_REASON_LEAVE: return " via cpg_leave";
588  case CPG_REASON_NODEDOWN: return " via cluster exit";
589  case CPG_REASON_NODEUP: return " via cluster join";
590  case CPG_REASON_PROCDOWN: return " for unknown reason";
591  default: break;
592  }
593  return "";
594 }
595 
604 static inline const char *
605 peer_name(crm_node_t *peer)
606 {
607  if (peer == NULL) {
608  return "unknown node";
609  } else if (peer->uname == NULL) {
610  return "peer node";
611  } else {
612  return peer->uname;
613  }
614 }
615 
628 void
629 pcmk_cpg_membership(cpg_handle_t handle,
630  const struct cpg_name *groupName,
631  const struct cpg_address *member_list, size_t member_list_entries,
632  const struct cpg_address *left_list, size_t left_list_entries,
633  const struct cpg_address *joined_list, size_t joined_list_entries)
634 {
635  int i;
636  gboolean found = FALSE;
637  static int counter = 0;
638  uint32_t local_nodeid = get_local_nodeid(handle);
639  const struct cpg_address *key, **sorted;
640 
641  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
642  CRM_ASSERT(sorted != NULL);
643 
644  for (size_t iter = 0; iter < member_list_entries; iter++) {
645  sorted[iter] = member_list + iter;
646  }
647  /* so that the cross-matching multiply-subscribed nodes is then cheap */
648  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
649  cmp_member_list_nodeid);
650 
651  for (i = 0; i < left_list_entries; i++) {
652  crm_node_t *peer = pcmk__search_cluster_node_cache(left_list[i].nodeid,
653  NULL);
654  const struct cpg_address **rival = NULL;
655 
656  /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
657  and not playing by this rule may go wild in case of multiple
658  residual instances of the same pacemaker daemon at the same node
659  -- we must ensure that the possible local rival(s) won't make us
660  cry out and bail (e.g. when they quit themselves), since all the
661  surrounding logic denies this simple fact that the full membership
662  is discriminated also per the PID of the process beside mere node
663  ID (and implicitly, group ID); practically, this will be sound in
664  terms of not preventing progress, since all the CPG joiners are
665  also API end-point carriers, and that's what matters locally
666  (who's the winner);
667  remotely, we will just compare leave_list and member_list and if
668  the left process has its node retained in member_list (under some
669  other PID, anyway) we will just ignore it as well
670  XXX: long-term fix is to establish in-out PID-aware tracking? */
671  if (peer) {
672  key = &left_list[i];
673  rival = bsearch(&key, sorted, member_list_entries,
674  sizeof(const struct cpg_address *),
675  cmp_member_list_nodeid);
676  }
677 
678  if (rival == NULL) {
679  crm_info("Group %s event %d: %s (node %u pid %u) left%s",
680  groupName->value, counter, peer_name(peer),
681  left_list[i].nodeid, left_list[i].pid,
682  cpgreason2str(left_list[i].reason));
683  if (peer) {
684  crm_update_peer_proc(__func__, peer, crm_proc_cpg,
685  OFFLINESTATUS);
686  }
687  } else if (left_list[i].nodeid == local_nodeid) {
688  crm_warn("Group %s event %d: duplicate local pid %u left%s",
689  groupName->value, counter,
690  left_list[i].pid, cpgreason2str(left_list[i].reason));
691  } else {
692  crm_warn("Group %s event %d: "
693  "%s (node %u) duplicate pid %u left%s (%u remains)",
694  groupName->value, counter, peer_name(peer),
695  left_list[i].nodeid, left_list[i].pid,
696  cpgreason2str(left_list[i].reason), (*rival)->pid);
697  }
698  }
699  free(sorted);
700  sorted = NULL;
701 
702  for (i = 0; i < joined_list_entries; i++) {
703  crm_info("Group %s event %d: node %u pid %u joined%s",
704  groupName->value, counter, joined_list[i].nodeid,
705  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
706  }
707 
708  for (i = 0; i < member_list_entries; i++) {
709  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
710 
711  if (member_list[i].nodeid == local_nodeid
712  && member_list[i].pid != getpid()) {
713  /* see the note above */
714  crm_warn("Group %s event %d: detected duplicate local pid %u",
715  groupName->value, counter, member_list[i].pid);
716  continue;
717  }
718  crm_info("Group %s event %d: %s (node %u pid %u) is member",
719  groupName->value, counter, peer_name(peer),
720  member_list[i].nodeid, member_list[i].pid);
721 
722  /* If the caller left auto-reaping enabled, this will also update the
723  * state to member.
724  */
725  peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
726  ONLINESTATUS);
727 
728  if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
729  /* The node is a CPG member, but we currently think it's not a
730  * cluster member. This is possible only if auto-reaping was
731  * disabled. The node may be joining, and we happened to get the CPG
732  * notification before the quorum notification; or the node may have
733  * just died, and we are processing its final messages; or a bug
734  * has affected the peer cache.
735  */
736  time_t now = time(NULL);
737 
738  if (peer->when_lost == 0) {
739  // Track when we first got into this contradictory state
740  peer->when_lost = now;
741 
742  } else if (now > (peer->when_lost + 60)) {
743  // If it persists for more than a minute, update the state
744  crm_warn("Node %u is member of group %s but was believed offline",
745  member_list[i].nodeid, groupName->value);
746  pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
747  }
748  }
749 
750  if (local_nodeid == member_list[i].nodeid) {
751  found = TRUE;
752  }
753  }
754 
755  if (!found) {
756  crm_err("Local node was evicted from group %s", groupName->value);
757  cpg_evicted = true;
758  }
759 
760  counter++;
761 }
762 
770 gboolean
772 {
773  cs_error_t rc;
774  int fd = -1;
775  int retries = 0;
776  uint32_t id = 0;
777  crm_node_t *peer = NULL;
778  cpg_handle_t handle = 0;
779  const char *message_name = pcmk__message_name(crm_system_name);
780  uid_t found_uid = 0;
781  gid_t found_gid = 0;
782  pid_t found_pid = 0;
783  int rv;
784 
785  struct mainloop_fd_callbacks cpg_fd_callbacks = {
786  .dispatch = pcmk_cpg_dispatch,
787  .destroy = cluster->destroy,
788  };
789 
790  cpg_callbacks_t cpg_callbacks = {
791  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
792  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
793  /* .cpg_deliver_fn = pcmk_cpg_deliver, */
794  /* .cpg_confchg_fn = pcmk_cpg_membership, */
795  };
796 
797  cpg_evicted = false;
798  cluster->group.length = 0;
799  cluster->group.value[0] = 0;
800 
801  /* group.value is char[128] */
802  strncpy(cluster->group.value, message_name, 127);
803  cluster->group.value[127] = 0;
804  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
805 
806  cs_repeat(rc, retries, 30, cpg_initialize(&handle, &cpg_callbacks));
807  if (rc != CS_OK) {
808  crm_err("Could not connect to the CPG API: %s (%d)",
809  cs_strerror(rc), rc);
810  goto bail;
811  }
812 
813  rc = cpg_fd_get(handle, &fd);
814  if (rc != CS_OK) {
815  crm_err("Could not obtain the CPG API connection: %s (%d)",
816  cs_strerror(rc), rc);
817  goto bail;
818  }
819 
820  /* CPG provider run as root (in given user namespace, anyway)? */
821  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
822  &found_uid, &found_gid))) {
823  crm_err("CPG provider is not authentic:"
824  " process %lld (uid: %lld, gid: %lld)",
825  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
826  (long long) found_uid, (long long) found_gid);
827  rc = CS_ERR_ACCESS;
828  goto bail;
829  } else if (rv < 0) {
830  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
831  strerror(-rv), -rv);
832  rc = CS_ERR_ACCESS;
833  goto bail;
834  }
835 
836  id = get_local_nodeid(handle);
837  if (id == 0) {
838  crm_err("Could not get local node id from the CPG API");
839  goto bail;
840 
841  }
842  cluster->nodeid = id;
843 
844  retries = 0;
845  cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
846  if (rc != CS_OK) {
847  crm_err("Could not join the CPG group '%s': %d", message_name, rc);
848  goto bail;
849  }
850 
851  pcmk_cpg_handle = handle;
852  cluster->cpg_handle = handle;
853  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
854 
855  bail:
856  if (rc != CS_OK) {
857  cpg_finalize(handle);
858  return FALSE;
859  }
860 
861  peer = crm_get_peer(id, NULL);
863  return TRUE;
864 }
865 
876 gboolean
877 pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
878 {
879  gboolean rc = TRUE;
880  char *data = NULL;
881 
882  data = dump_xml_unformatted(msg);
883  rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
884  free(data);
885  return rc;
886 }
887 
900 gboolean
901 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
902  gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
903 {
904  static int msg_id = 0;
905  static int local_pid = 0;
906  static int local_name_len = 0;
907  static const char *local_name = NULL;
908 
909  char *target = NULL;
910  struct iovec *iov;
911  pcmk__cpg_msg_t *msg = NULL;
913 
914  switch (msg_class) {
915  case crm_class_cluster:
916  break;
917  default:
918  crm_err("Invalid message class: %d", msg_class);
919  return FALSE;
920  }
921 
922  CRM_CHECK(dest != crm_msg_ais, return FALSE);
923 
924  if (local_name == NULL) {
925  local_name = get_local_node_name();
926  }
927  if ((local_name_len == 0) && (local_name != NULL)) {
928  local_name_len = strlen(local_name);
929  }
930 
931  if (data == NULL) {
932  data = "";
933  }
934 
935  if (local_pid == 0) {
936  local_pid = getpid();
937  }
938 
939  if (sender == crm_msg_none) {
940  sender = local_pid;
941  }
942 
943  msg = calloc(1, sizeof(pcmk__cpg_msg_t));
944 
945  msg_id++;
946  msg->id = msg_id;
947  msg->header.id = msg_class;
948  msg->header.error = CS_OK;
949 
950  msg->host.type = dest;
951  msg->host.local = local;
952 
953  if (node) {
954  if (node->uname) {
955  target = strdup(node->uname);
956  msg->host.size = strlen(node->uname);
957  memset(msg->host.uname, 0, MAX_NAME);
958  memcpy(msg->host.uname, node->uname, msg->host.size);
959  } else {
960  target = crm_strdup_printf("%u", node->id);
961  }
962  msg->host.id = node->id;
963  } else {
964  target = strdup("all");
965  }
966 
967  msg->sender.id = 0;
968  msg->sender.type = sender;
969  msg->sender.pid = local_pid;
970  msg->sender.size = local_name_len;
971  memset(msg->sender.uname, 0, MAX_NAME);
972  if ((local_name != NULL) && (msg->sender.size != 0)) {
973  memcpy(msg->sender.uname, local_name, msg->sender.size);
974  }
975 
976  msg->size = 1 + strlen(data);
977  msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
978 
979  if (msg->size < CRM_BZ2_THRESHOLD) {
980  msg = pcmk__realloc(msg, msg->header.size);
981  memcpy(msg->data, data, msg->size);
982 
983  } else {
984  char *compressed = NULL;
985  unsigned int new_size = 0;
986  char *uncompressed = strdup(data);
987 
988  if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
989  &compressed, &new_size) == pcmk_rc_ok) {
990 
991  msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
992  msg = pcmk__realloc(msg, msg->header.size);
993  memcpy(msg->data, compressed, new_size);
994 
995  msg->is_compressed = TRUE;
996  msg->compressed_size = new_size;
997 
998  } else {
999  // cppcheck seems not to understand the abort logic in pcmk__realloc
1000  // cppcheck-suppress memleak
1001  msg = pcmk__realloc(msg, msg->header.size);
1002  memcpy(msg->data, data, msg->size);
1003  }
1004 
1005  free(uncompressed);
1006  free(compressed);
1007  }
1008 
1009  iov = calloc(1, sizeof(struct iovec));
1010  iov->iov_base = msg;
1011  iov->iov_len = msg->header.size;
1012 
1013  if (msg->compressed_size) {
1014  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1015  msg->id, target, (unsigned long long) iov->iov_len,
1016  msg->compressed_size, data);
1017  } else {
1018  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1019  msg->id, target, (unsigned long long) iov->iov_len,
1020  msg->size, data);
1021  }
1022  free(target);
1023 
1024  cs_message_queue = g_list_append(cs_message_queue, iov);
1025  crm_cs_flush(&pcmk_cpg_handle);
1026 
1027  return TRUE;
1028 }
1029 
1037 enum crm_ais_msg_types
1038 text2msg_type(const char *text)
1039 {
1040  int type = crm_msg_none;
1041 
1042  CRM_CHECK(text != NULL, return type);
1043  text = pcmk__message_name(text);
1044  if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1045  type = crm_msg_ais;
1046  } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1047  type = crm_msg_cib;
1048  } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1049  type = crm_msg_crmd;
1050  } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1051  type = crm_msg_te;
1052  } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1053  type = crm_msg_pe;
1054  } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1055  type = crm_msg_lrmd;
1056  } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1057  type = crm_msg_stonithd;
1058  } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1059  type = crm_msg_stonith_ng;
1060  } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1061  type = crm_msg_attrd;
1062 
1063  } else {
1064  /* This will normally be a transient client rather than
1065  * a cluster daemon. Set the type to the pid of the client
1066  */
1067  int scan_rc = sscanf(text, "%d", &type);
1068 
1069  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1070  /* Ensure it's sane */
1071  type = crm_msg_none;
1072  }
1073  }
1074  return type;
1075 }
pcmk__cpg_host_t host
Definition: cpg.c:49
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:218
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: messages.c:182
#define cs_repeat(rc, counter, max, code)
Definition: cpg.c:78
uint32_t size
Definition: cpg.c:49
#define crm_notice(fmt, args...)
Definition: logging.h:352
struct election_s __attribute__
const char * bz2_strerror(int rc)
Definition: results.c:726
char data[0]
Definition: cpg.c:55
crm_ais_msg_types
Definition: cluster.h:103
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:975
uint32_t nodeid
Definition: cluster.h:80
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
Definition: strings.c:929
uint32_t id
Definition: cluster.h:66
pcmk__cpg_host_t sender
Definition: cpg.c:50
const char * get_local_node_name(void)
Get the local node&#39;s name.
Definition: cluster.c:155
void(* destroy)(gpointer)
Definition: cluster.h:82
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:53
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
Definition: cpg.c:72
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Get a cluster node cache entry.
Definition: membership.c:702
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
Definition: mainloop.h:137
char * crm_system_name
Definition: utils.c:54
#define CS_SEND_MAX
Definition: cpg.c:202
enum crm_ais_msg_types type
Definition: cpg.c:48
char * strerror(int errnum)
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:901
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Extract text data from a Corosync CPG message.
Definition: cpg.c:444
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:102
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Disconnect from Corosync CPG.
Definition: cpg.c:95
struct pcmk__cpg_host_s pcmk__cpg_host_t
Definition: cpg.c:55
gboolean local
Definition: cpg.c:47
#define crm_warn(fmt, args...)
Definition: logging.h:351
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node&#39;s state and membership information.
Definition: membership.c:1079
int rc
Definition: pcmk_fence.c:35
uint32_t pid
Definition: cpg.c:46
enum crm_ais_msg_types text2msg_type(const char *text)
Get the message type equivalent of a string.
Definition: cpg.c:1038
#define crm_debug(fmt, args...)
Definition: logging.h:355
time_t when_lost
Definition: cluster.h:67
#define crm_trace(fmt, args...)
Definition: logging.h:356
crm_node_t * pcmk__search_cluster_node_cache(unsigned int id, const char *uname)
Definition: membership.c:562
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:876
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define CRM_SYSTEM_PENGINE
Definition: crm.h:108
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition: strings.c:748
#define CRM_NODE_MEMBER
Definition: cluster.h:33
gboolean pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:877
uint32_t id
Definition: cpg.c:45
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
Definition: crm.h:76
uint32_t compressed_size
Definition: cpg.c:53
#define CRM_SYSTEM_CRMD
Definition: crm.h:106
const char * target
Definition: pcmk_fence.c:29
crm_ais_msg_class
Definition: cluster.h:99
#define CRM_XS
Definition: logging.h:54
#define CRM_SYSTEM_STONITHD
Definition: crm.h:110
#define CRM_SYSTEM_CIB
Definition: crm.h:105
#define CRM_SYSTEM_TENGINE
Definition: crm.h:109
gboolean is_compressed
Definition: cpg.c:47
uint32_t get_local_nodeid(cpg_handle_t handle)
Get the local Corosync node ID (via CPG)
Definition: cpg.c:117
#define crm_err(fmt, args...)
Definition: logging.h:350
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:181
#define CRM_ASSERT(expr)
Definition: results.h:42
#define OFFLINESTATUS
Definition: util.h:39
#define CRM_BZ2_THRESHOLD
Definition: xml.h:47
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:2017
#define CRM_SYSTEM_LRMD
Definition: crm.h:107
char uname[MAX_NAME]
Definition: cpg.c:50
char * state
Definition: cluster.h:55
#define msg_data_len(msg)
Definition: cpg.c:76
IPC interface to Pacemaker daemons.
char * uname
Definition: cluster.h:53
#define ONLINESTATUS
Definition: util.h:38
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Handle a CPG configuration change event.
Definition: cpg.c:629
#define crm_info(fmt, args...)
Definition: logging.h:353
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Connect to Corosync CPG.
Definition: cpg.c:771
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
Definition: ipc_client.c:1382