pacemaker  2.1.1-52dc28db4
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2020 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipc_common.h>
23 #include <qb/qbipcc.h>
24 #include <qb/qbutil.h>
25 
26 #include <corosync/corodefs.h>
27 #include <corosync/corotypes.h>
28 #include <corosync/hdb.h>
29 #include <corosync/cpg.h>
30 
31 #include <crm/msg_xml.h>
32 
33 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
34 #include "crmcluster_private.h"
35 
36 /* @TODO Once we can update the public API to require crm_cluster_t* in more
37  * functions, we can ditch this in favor of cluster->cpg_handle.
38  */
39 static cpg_handle_t pcmk_cpg_handle = 0;
40 
41 // @TODO These could be moved to crm_cluster_t* at that time as well
42 static bool cpg_evicted = false;
43 static GList *cs_message_queue = NULL;
44 static int cs_message_timer = 0;
45 
46 struct pcmk__cpg_host_s {
47  uint32_t id;
48  uint32_t pid;
49  gboolean local;
51  uint32_t size;
52  char uname[MAX_NAME];
53 } __attribute__ ((packed));
54 
55 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
56 
57 struct pcmk__cpg_msg_s {
58  struct qb_ipc_response_header header __attribute__ ((aligned(8)));
59  uint32_t id;
60  gboolean is_compressed;
61 
64 
65  uint32_t size;
66  uint32_t compressed_size;
67  /* 584 bytes */
68  char data[0];
69 
70 } __attribute__ ((packed));
71 
72 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
73 
74 static void crm_cs_flush(gpointer data);
75 
76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
77 
78 #define cs_repeat(rc, counter, max, code) do { \
79  rc = code; \
80  if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
81  counter++; \
82  crm_debug("Retrying operation after %ds", counter); \
83  sleep(counter); \
84  } else { \
85  break; \
86  } \
87  } while (counter < max)
88 
94 void
96 {
97  pcmk_cpg_handle = 0;
98  if (cluster->cpg_handle) {
99  crm_trace("Disconnecting CPG");
100  cpg_leave(cluster->cpg_handle, &cluster->group);
101  cpg_finalize(cluster->cpg_handle);
102  cluster->cpg_handle = 0;
103 
104  } else {
105  crm_info("No CPG connection");
106  }
107 }
108 
116 uint32_t
117 get_local_nodeid(cpg_handle_t handle)
118 {
119  cs_error_t rc = CS_OK;
120  int retries = 0;
121  static uint32_t local_nodeid = 0;
122  cpg_handle_t local_handle = handle;
123  cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
124  int fd = -1;
125  uid_t found_uid = 0;
126  gid_t found_gid = 0;
127  pid_t found_pid = 0;
128  int rv;
129 
130  if(local_nodeid != 0) {
131  return local_nodeid;
132  }
133 
134  if(handle == 0) {
135  crm_trace("Creating connection");
136  cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
137  if (rc != CS_OK) {
138  crm_err("Could not connect to the CPG API: %s (%d)",
139  cs_strerror(rc), rc);
140  return 0;
141  }
142 
143  rc = cpg_fd_get(local_handle, &fd);
144  if (rc != CS_OK) {
145  crm_err("Could not obtain the CPG API connection: %s (%d)",
146  cs_strerror(rc), rc);
147  goto bail;
148  }
149 
150  /* CPG provider run as root (in given user namespace, anyway)? */
151  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
152  &found_uid, &found_gid))) {
153  crm_err("CPG provider is not authentic:"
154  " process %lld (uid: %lld, gid: %lld)",
155  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
156  (long long) found_uid, (long long) found_gid);
157  goto bail;
158  } else if (rv < 0) {
159  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
160  strerror(-rv), -rv);
161  goto bail;
162  }
163  }
164 
165  if (rc == CS_OK) {
166  retries = 0;
167  crm_trace("Performing lookup");
168  cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
169  }
170 
171  if (rc != CS_OK) {
172  crm_err("Could not get local node id from the CPG API: %s (%d)",
173  pcmk__cs_err_str(rc), rc);
174  }
175 
176 bail:
177  if(handle == 0) {
178  crm_trace("Closing connection");
179  cpg_finalize(local_handle);
180  }
181  crm_debug("Local nodeid is %u", local_nodeid);
182  return local_nodeid;
183 }
184 
193 static gboolean
194 crm_cs_flush_cb(gpointer data)
195 {
196  cs_message_timer = 0;
197  crm_cs_flush(data);
198  return FALSE;
199 }
200 
201 // Send no more than this many CPG messages in one flush
202 #define CS_SEND_MAX 200
203 
210 static void
211 crm_cs_flush(gpointer data)
212 {
213  unsigned int sent = 0;
214  guint queue_len = 0;
215  cs_error_t rc = 0;
216  cpg_handle_t *handle = (cpg_handle_t *) data;
217 
218  if (*handle == 0) {
219  crm_trace("Connection is dead");
220  return;
221  }
222 
223  queue_len = g_list_length(cs_message_queue);
224  if (((queue_len % 1000) == 0) && (queue_len > 1)) {
225  crm_err("CPG queue has grown to %d", queue_len);
226 
227  } else if (queue_len == CS_SEND_MAX) {
228  crm_warn("CPG queue has grown to %d", queue_len);
229  }
230 
231  if (cs_message_timer != 0) {
232  /* There is already a timer, wait until it goes off */
233  crm_trace("Timer active %d", cs_message_timer);
234  return;
235  }
236 
237  while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
238  struct iovec *iov = cs_message_queue->data;
239 
240  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
241  if (rc != CS_OK) {
242  break;
243  }
244 
245  sent++;
246  crm_trace("CPG message sent, size=%llu",
247  (unsigned long long) iov->iov_len);
248 
249  cs_message_queue = g_list_remove(cs_message_queue, iov);
250  free(iov->iov_base);
251  free(iov);
252  }
253 
254  queue_len -= sent;
255  if ((sent > 1) || (cs_message_queue != NULL)) {
256  crm_info("Sent %u CPG messages (%d remaining): %s (%d)",
257  sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
258  } else {
259  crm_trace("Sent %u CPG messages (%d remaining): %s (%d)",
260  sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
261  }
262 
263  if (cs_message_queue) {
264  uint32_t delay_ms = 100;
265  if (rc != CS_OK) {
266  /* Proportionally more if sending failed but cap at 1s */
267  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
268  }
269  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
270  }
271 }
272 
281 static int
282 pcmk_cpg_dispatch(gpointer user_data)
283 {
284  cs_error_t rc = CS_OK;
285  crm_cluster_t *cluster = (crm_cluster_t *) user_data;
286 
287  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
288  if (rc != CS_OK) {
289  crm_err("Connection to the CPG API failed: %s (%d)",
290  pcmk__cs_err_str(rc), rc);
291  cpg_finalize(cluster->cpg_handle);
292  cluster->cpg_handle = 0;
293  return -1;
294 
295  } else if (cpg_evicted) {
296  crm_err("Evicted from CPG membership");
297  return -1;
298  }
299  return 0;
300 }
301 
302 static inline const char *
303 ais_dest(const pcmk__cpg_host_t *host)
304 {
305  if (host->local) {
306  return "local";
307  } else if (host->size > 0) {
308  return host->uname;
309  } else {
310  return "<all>";
311  }
312 }
313 
314 static inline const char *
315 msg_type2text(enum crm_ais_msg_types type)
316 {
317  const char *text = "unknown";
318 
319  switch (type) {
320  case crm_msg_none:
321  text = "unknown";
322  break;
323  case crm_msg_ais:
324  text = "ais";
325  break;
326  case crm_msg_cib:
327  text = "cib";
328  break;
329  case crm_msg_crmd:
330  text = "crmd";
331  break;
332  case crm_msg_pe:
333  text = "pengine";
334  break;
335  case crm_msg_te:
336  text = "tengine";
337  break;
338  case crm_msg_lrmd:
339  text = "lrmd";
340  break;
341  case crm_msg_attrd:
342  text = "attrd";
343  break;
344  case crm_msg_stonithd:
345  text = "stonithd";
346  break;
347  case crm_msg_stonith_ng:
348  text = "stonith-ng";
349  break;
350  }
351  return text;
352 }
353 
362 static bool
363 check_message_sanity(const pcmk__cpg_msg_t *msg)
364 {
365  int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
366 
367  if (payload_size < 1) {
368  crm_err("%sCPG message %d from %s invalid: "
369  "Claimed size of %d bytes is too small "
370  CRM_XS " from %s[%u] to %s@%s",
371  (msg->is_compressed? "Compressed " : ""),
372  msg->id, ais_dest(&(msg->sender)),
373  (int) msg->header.size,
374  msg_type2text(msg->sender.type), msg->sender.pid,
375  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
376  return false;
377  }
378 
379  if (msg->header.error != CS_OK) {
380  crm_err("%sCPG message %d from %s invalid: "
381  "Sender indicated error %d "
382  CRM_XS " from %s[%u] to %s@%s",
383  (msg->is_compressed? "Compressed " : ""),
384  msg->id, ais_dest(&(msg->sender)),
385  msg->header.error,
386  msg_type2text(msg->sender.type), msg->sender.pid,
387  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
388  return false;
389  }
390 
391  if (msg_data_len(msg) != payload_size) {
392  crm_err("%sCPG message %d from %s invalid: "
393  "Total size %d inconsistent with payload size %d "
394  CRM_XS " from %s[%u] to %s@%s",
395  (msg->is_compressed? "Compressed " : ""),
396  msg->id, ais_dest(&(msg->sender)),
397  (int) msg->header.size, (int) msg_data_len(msg),
398  msg_type2text(msg->sender.type), msg->sender.pid,
399  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
400  return false;
401  }
402 
403  if (!msg->is_compressed &&
404  /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
405  * but checking the last byte or two should be quick
406  */
407  (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
408  || (msg->data[msg->size - 1] != '\0'))) {
409  crm_err("CPG message %d from %s invalid: "
410  "Payload does not end at byte %llu "
411  CRM_XS " from %s[%u] to %s@%s",
412  msg->id, ais_dest(&(msg->sender)),
413  (unsigned long long) msg->size,
414  msg_type2text(msg->sender.type), msg->sender.pid,
415  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
416  return false;
417  }
418 
419  crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
420  (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
421  msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
422  ais_dest(&(msg->sender)),
423  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
424  return true;
425 }
426 
443 char *
444 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
445  uint32_t *kind, const char **from)
446 {
447  char *data = NULL;
448  pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
449 
450  if(handle) {
451  // Do filtering and field massaging
452  uint32_t local_nodeid = get_local_nodeid(handle);
453  const char *local_name = get_local_node_name();
454 
455  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
456  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
457  return NULL;
458 
459  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
460  /* Not for us */
461  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
462  return NULL;
463  } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
464  /* Not for us */
465  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
466  return NULL;
467  }
468 
469  msg->sender.id = nodeid;
470  if (msg->sender.size == 0) {
471  crm_node_t *peer = crm_get_peer(nodeid, NULL);
472 
473  if (peer == NULL) {
474  crm_err("Peer with nodeid=%u is unknown", nodeid);
475 
476  } else if (peer->uname == NULL) {
477  crm_err("No uname for peer with nodeid=%u", nodeid);
478 
479  } else {
480  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
481  msg->sender.size = strlen(peer->uname);
482  memset(msg->sender.uname, 0, MAX_NAME);
483  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
484  }
485  }
486  }
487 
488  crm_trace("Got new%s message (size=%d, %d, %d)",
489  msg->is_compressed ? " compressed" : "",
490  msg_data_len(msg), msg->size, msg->compressed_size);
491 
492  if (kind != NULL) {
493  *kind = msg->header.id;
494  }
495  if (from != NULL) {
496  *from = msg->sender.uname;
497  }
498 
499  if (msg->is_compressed && msg->size > 0) {
500  int rc = BZ_OK;
501  char *uncompressed = NULL;
502  unsigned int new_size = msg->size + 1;
503 
504  if (!check_message_sanity(msg)) {
505  goto badmsg;
506  }
507 
508  crm_trace("Decompressing message data");
509  uncompressed = calloc(1, new_size);
510  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
511 
512  if (rc != BZ_OK) {
513  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
514  bz2_strerror(rc), rc);
515  free(uncompressed);
516  goto badmsg;
517  }
518 
519  CRM_ASSERT(rc == BZ_OK);
520  CRM_ASSERT(new_size == msg->size);
521 
522  data = uncompressed;
523 
524  } else if (!check_message_sanity(msg)) {
525  goto badmsg;
526 
527  } else {
528  data = strdup(msg->data);
529  }
530 
531  // Is this necessary?
532  crm_get_peer(msg->sender.id, msg->sender.uname);
533 
534  crm_trace("Payload: %.200s", data);
535  return data;
536 
537  badmsg:
538  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
539  " min=%d, total=%d, size=%d, bz2_size=%d",
540  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
541  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
542  msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
543  msg->header.size, msg->size, msg->compressed_size);
544 
545  free(data);
546  return NULL;
547 }
548 
560 static int
561 cmp_member_list_nodeid(const void *first, const void *second)
562 {
563  const struct cpg_address *const a = *((const struct cpg_address **) first),
564  *const b = *((const struct cpg_address **) second);
565  if (a->nodeid < b->nodeid) {
566  return -1;
567  } else if (a->nodeid > b->nodeid) {
568  return 1;
569  }
570  /* don't bother with "reason" nor "pid" */
571  return 0;
572 }
573 
582 static const char *
583 cpgreason2str(cpg_reason_t reason)
584 {
585  switch (reason) {
586  case CPG_REASON_JOIN: return " via cpg_join";
587  case CPG_REASON_LEAVE: return " via cpg_leave";
588  case CPG_REASON_NODEDOWN: return " via cluster exit";
589  case CPG_REASON_NODEUP: return " via cluster join";
590  case CPG_REASON_PROCDOWN: return " for unknown reason";
591  default: break;
592  }
593  return "";
594 }
595 
604 static inline const char *
605 peer_name(crm_node_t *peer)
606 {
607  if (peer == NULL) {
608  return "unknown node";
609  } else if (peer->uname == NULL) {
610  return "peer node";
611  } else {
612  return peer->uname;
613  }
614 }
615 
628 void
629 pcmk_cpg_membership(cpg_handle_t handle,
630  const struct cpg_name *groupName,
631  const struct cpg_address *member_list, size_t member_list_entries,
632  const struct cpg_address *left_list, size_t left_list_entries,
633  const struct cpg_address *joined_list, size_t joined_list_entries)
634 {
635  int i;
636  gboolean found = FALSE;
637  static int counter = 0;
638  uint32_t local_nodeid = get_local_nodeid(handle);
639  const struct cpg_address *key, **sorted;
640 
641  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
642  CRM_ASSERT(sorted != NULL);
643 
644  for (size_t iter = 0; iter < member_list_entries; iter++) {
645  sorted[iter] = member_list + iter;
646  }
647  /* so that the cross-matching multiply-subscribed nodes is then cheap */
648  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
649  cmp_member_list_nodeid);
650 
651  for (i = 0; i < left_list_entries; i++) {
652  crm_node_t *peer = pcmk__search_cluster_node_cache(left_list[i].nodeid,
653  NULL);
654  const struct cpg_address **rival = NULL;
655 
656  /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
657  and not playing by this rule may go wild in case of multiple
658  residual instances of the same pacemaker daemon at the same node
659  -- we must ensure that the possible local rival(s) won't make us
660  cry out and bail (e.g. when they quit themselves), since all the
661  surrounding logic denies this simple fact that the full membership
662  is discriminated also per the PID of the process beside mere node
663  ID (and implicitly, group ID); practically, this will be sound in
664  terms of not preventing progress, since all the CPG joiners are
665  also API end-point carriers, and that's what matters locally
666  (who's the winner);
667  remotely, we will just compare leave_list and member_list and if
668  the left process has its node retained in member_list (under some
669  other PID, anyway) we will just ignore it as well
670  XXX: long-term fix is to establish in-out PID-aware tracking? */
671  if (peer) {
672  key = &left_list[i];
673  rival = bsearch(&key, sorted, member_list_entries,
674  sizeof(const struct cpg_address *),
675  cmp_member_list_nodeid);
676  }
677 
678  if (rival == NULL) {
679  crm_info("Group %s event %d: %s (node %u pid %u) left%s",
680  groupName->value, counter, peer_name(peer),
681  left_list[i].nodeid, left_list[i].pid,
682  cpgreason2str(left_list[i].reason));
683  if (peer) {
684  crm_update_peer_proc(__func__, peer, crm_proc_cpg,
685  OFFLINESTATUS);
686  }
687  } else if (left_list[i].nodeid == local_nodeid) {
688  crm_warn("Group %s event %d: duplicate local pid %u left%s",
689  groupName->value, counter,
690  left_list[i].pid, cpgreason2str(left_list[i].reason));
691  } else {
692  crm_warn("Group %s event %d: "
693  "%s (node %u) duplicate pid %u left%s (%u remains)",
694  groupName->value, counter, peer_name(peer),
695  left_list[i].nodeid, left_list[i].pid,
696  cpgreason2str(left_list[i].reason), (*rival)->pid);
697  }
698  }
699  free(sorted);
700  sorted = NULL;
701 
702  for (i = 0; i < joined_list_entries; i++) {
703  crm_info("Group %s event %d: node %u pid %u joined%s",
704  groupName->value, counter, joined_list[i].nodeid,
705  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
706  }
707 
708  for (i = 0; i < member_list_entries; i++) {
709  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
710 
711  if (member_list[i].nodeid == local_nodeid
712  && member_list[i].pid != getpid()) {
713  /* see the note above */
714  crm_warn("Group %s event %d: detected duplicate local pid %u",
715  groupName->value, counter, member_list[i].pid);
716  continue;
717  }
718  crm_info("Group %s event %d: %s (node %u pid %u) is member",
719  groupName->value, counter, peer_name(peer),
720  member_list[i].nodeid, member_list[i].pid);
721 
722  /* If the caller left auto-reaping enabled, this will also update the
723  * state to member.
724  */
725  peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
726  ONLINESTATUS);
727 
728  if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
729  /* The node is a CPG member, but we currently think it's not a
730  * cluster member. This is possible only if auto-reaping was
731  * disabled. The node may be joining, and we happened to get the CPG
732  * notification before the quorum notification; or the node may have
733  * just died, and we are processing its final messages; or a bug
734  * has affected the peer cache.
735  */
736  time_t now = time(NULL);
737 
738  if (peer->when_lost == 0) {
739  // Track when we first got into this contradictory state
740  peer->when_lost = now;
741 
742  } else if (now > (peer->when_lost + 60)) {
743  // If it persists for more than a minute, update the state
744  crm_warn("Node %u is member of group %s but was believed offline",
745  member_list[i].nodeid, groupName->value);
746  pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
747  }
748  }
749 
750  if (local_nodeid == member_list[i].nodeid) {
751  found = TRUE;
752  }
753  }
754 
755  if (!found) {
756  crm_err("Local node was evicted from group %s", groupName->value);
757  cpg_evicted = true;
758  }
759 
760  counter++;
761 }
762 
770 gboolean
772 {
773  cs_error_t rc;
774  int fd = -1;
775  int retries = 0;
776  uint32_t id = 0;
777  crm_node_t *peer = NULL;
778  cpg_handle_t handle = 0;
779  const char *message_name = pcmk__message_name(crm_system_name);
780  uid_t found_uid = 0;
781  gid_t found_gid = 0;
782  pid_t found_pid = 0;
783  int rv;
784 
785  struct mainloop_fd_callbacks cpg_fd_callbacks = {
786  .dispatch = pcmk_cpg_dispatch,
787  .destroy = cluster->destroy,
788  };
789 
790  cpg_model_v1_data_t cpg_model_info = {
791  .model = CPG_MODEL_V1,
792  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
793  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
794  .cpg_totem_confchg_fn = NULL,
795  .flags = 0,
796  };
797 
798  cpg_evicted = false;
799  cluster->group.length = 0;
800  cluster->group.value[0] = 0;
801 
802  /* group.value is char[128] */
803  strncpy(cluster->group.value, message_name, 127);
804  cluster->group.value[127] = 0;
805  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
806 
807  cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
808  if (rc != CS_OK) {
809  crm_err("Could not connect to the CPG API: %s (%d)",
810  cs_strerror(rc), rc);
811  goto bail;
812  }
813 
814  rc = cpg_fd_get(handle, &fd);
815  if (rc != CS_OK) {
816  crm_err("Could not obtain the CPG API connection: %s (%d)",
817  cs_strerror(rc), rc);
818  goto bail;
819  }
820 
821  /* CPG provider run as root (in given user namespace, anyway)? */
822  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
823  &found_uid, &found_gid))) {
824  crm_err("CPG provider is not authentic:"
825  " process %lld (uid: %lld, gid: %lld)",
826  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
827  (long long) found_uid, (long long) found_gid);
828  rc = CS_ERR_ACCESS;
829  goto bail;
830  } else if (rv < 0) {
831  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
832  strerror(-rv), -rv);
833  rc = CS_ERR_ACCESS;
834  goto bail;
835  }
836 
837  id = get_local_nodeid(handle);
838  if (id == 0) {
839  crm_err("Could not get local node id from the CPG API");
840  goto bail;
841 
842  }
843  cluster->nodeid = id;
844 
845  retries = 0;
846  cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
847  if (rc != CS_OK) {
848  crm_err("Could not join the CPG group '%s': %d", message_name, rc);
849  goto bail;
850  }
851 
852  pcmk_cpg_handle = handle;
853  cluster->cpg_handle = handle;
854  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
855 
856  bail:
857  if (rc != CS_OK) {
858  cpg_finalize(handle);
859  return FALSE;
860  }
861 
862  peer = crm_get_peer(id, NULL);
864  return TRUE;
865 }
866 
877 gboolean
878 pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
879 {
880  gboolean rc = TRUE;
881  char *data = NULL;
882 
883  data = dump_xml_unformatted(msg);
884  rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
885  free(data);
886  return rc;
887 }
888 
901 gboolean
902 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
903  gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
904 {
905  static int msg_id = 0;
906  static int local_pid = 0;
907  static int local_name_len = 0;
908  static const char *local_name = NULL;
909 
910  char *target = NULL;
911  struct iovec *iov;
912  pcmk__cpg_msg_t *msg = NULL;
914 
915  switch (msg_class) {
916  case crm_class_cluster:
917  break;
918  default:
919  crm_err("Invalid message class: %d", msg_class);
920  return FALSE;
921  }
922 
923  CRM_CHECK(dest != crm_msg_ais, return FALSE);
924 
925  if (local_name == NULL) {
926  local_name = get_local_node_name();
927  }
928  if ((local_name_len == 0) && (local_name != NULL)) {
929  local_name_len = strlen(local_name);
930  }
931 
932  if (data == NULL) {
933  data = "";
934  }
935 
936  if (local_pid == 0) {
937  local_pid = getpid();
938  }
939 
940  if (sender == crm_msg_none) {
941  sender = local_pid;
942  }
943 
944  msg = calloc(1, sizeof(pcmk__cpg_msg_t));
945 
946  msg_id++;
947  msg->id = msg_id;
948  msg->header.id = msg_class;
949  msg->header.error = CS_OK;
950 
951  msg->host.type = dest;
952  msg->host.local = local;
953 
954  if (node) {
955  if (node->uname) {
956  target = strdup(node->uname);
957  msg->host.size = strlen(node->uname);
958  memset(msg->host.uname, 0, MAX_NAME);
959  memcpy(msg->host.uname, node->uname, msg->host.size);
960  } else {
961  target = crm_strdup_printf("%u", node->id);
962  }
963  msg->host.id = node->id;
964  } else {
965  target = strdup("all");
966  }
967 
968  msg->sender.id = 0;
969  msg->sender.type = sender;
970  msg->sender.pid = local_pid;
971  msg->sender.size = local_name_len;
972  memset(msg->sender.uname, 0, MAX_NAME);
973  if ((local_name != NULL) && (msg->sender.size != 0)) {
974  memcpy(msg->sender.uname, local_name, msg->sender.size);
975  }
976 
977  msg->size = 1 + strlen(data);
978  msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
979 
980  if (msg->size < CRM_BZ2_THRESHOLD) {
981  msg = pcmk__realloc(msg, msg->header.size);
982  memcpy(msg->data, data, msg->size);
983 
984  } else {
985  char *compressed = NULL;
986  unsigned int new_size = 0;
987  char *uncompressed = strdup(data);
988 
989  if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
990  &compressed, &new_size) == pcmk_rc_ok) {
991 
992  msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
993  msg = pcmk__realloc(msg, msg->header.size);
994  memcpy(msg->data, compressed, new_size);
995 
996  msg->is_compressed = TRUE;
997  msg->compressed_size = new_size;
998 
999  } else {
1000  // cppcheck seems not to understand the abort logic in pcmk__realloc
1001  // cppcheck-suppress memleak
1002  msg = pcmk__realloc(msg, msg->header.size);
1003  memcpy(msg->data, data, msg->size);
1004  }
1005 
1006  free(uncompressed);
1007  free(compressed);
1008  }
1009 
1010  iov = calloc(1, sizeof(struct iovec));
1011  iov->iov_base = msg;
1012  iov->iov_len = msg->header.size;
1013 
1014  if (msg->compressed_size) {
1015  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1016  msg->id, target, (unsigned long long) iov->iov_len,
1017  msg->compressed_size, data);
1018  } else {
1019  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1020  msg->id, target, (unsigned long long) iov->iov_len,
1021  msg->size, data);
1022  }
1023  free(target);
1024 
1025  cs_message_queue = g_list_append(cs_message_queue, iov);
1026  crm_cs_flush(&pcmk_cpg_handle);
1027 
1028  return TRUE;
1029 }
1030 
1038 enum crm_ais_msg_types
1039 text2msg_type(const char *text)
1040 {
1041  int type = crm_msg_none;
1042 
1043  CRM_CHECK(text != NULL, return type);
1044  text = pcmk__message_name(text);
1045  if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1046  type = crm_msg_ais;
1047  } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1048  type = crm_msg_cib;
1049  } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1050  type = crm_msg_crmd;
1051  } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1052  type = crm_msg_te;
1053  } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1054  type = crm_msg_pe;
1055  } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1056  type = crm_msg_lrmd;
1057  } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1059  } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1061  } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1062  type = crm_msg_attrd;
1063 
1064  } else {
1065  /* This will normally be a transient client rather than
1066  * a cluster daemon. Set the type to the pid of the client
1067  */
1068  int scan_rc = sscanf(text, "%d", &type);
1069 
1070  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1071  /* Ensure it's sane */
1072  type = crm_msg_none;
1073  }
1074  }
1075  return type;
1076 }
pcmk__cpg_host_t host
Definition: cpg.c:49
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:218
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: messages.c:182
#define cs_repeat(rc, counter, max, code)
Definition: cpg.c:78
uint32_t size
Definition: cpg.c:49
#define crm_notice(fmt, args...)
Definition: logging.h:352
const char * bz2_strerror(int rc)
Definition: results.c:726
char data[0]
Definition: cpg.c:55
crm_ais_msg_types
Definition: cluster.h:103
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:975
uint32_t nodeid
Definition: cluster.h:80
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
Definition: strings.c:955
uint32_t id
Definition: cluster.h:66
pcmk__cpg_host_t sender
Definition: cpg.c:50
const char * get_local_node_name(void)
Get the local node&#39;s name.
Definition: cluster.c:155
void(* destroy)(gpointer)
Definition: cluster.h:82
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:53
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
Definition: cpg.c:72
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Get a cluster node cache entry.
Definition: membership.c:702
char * crm_system_name
Definition: utils.c:54
#define CS_SEND_MAX
Definition: cpg.c:202
enum crm_ais_msg_types type
Definition: cpg.c:48
char * strerror(int errnum)
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:902
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Extract text data from a Corosync CPG message.
Definition: cpg.c:444
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:102
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Disconnect from Corosync CPG.
Definition: cpg.c:95
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
Definition: mainloop.h:137
struct pcmk__cpg_host_s pcmk__cpg_host_t
Definition: cpg.c:55
gboolean local
Definition: cpg.c:47
#define crm_warn(fmt, args...)
Definition: logging.h:351
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node&#39;s state and membership information.
Definition: membership.c:1079
int rc
Definition: pcmk_fence.c:35
uint32_t pid
Definition: cpg.c:46
#define crm_debug(fmt, args...)
Definition: logging.h:355
time_t when_lost
Definition: cluster.h:67
#define crm_trace(fmt, args...)
Definition: logging.h:356
crm_node_t * pcmk__search_cluster_node_cache(unsigned int id, const char *uname)
Definition: membership.c:562
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:876
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define CRM_SYSTEM_PENGINE
Definition: crm.h:108
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition: strings.c:748
#define CRM_NODE_MEMBER
Definition: cluster.h:33
gboolean pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:878
uint32_t id
Definition: cpg.c:45
struct pcmk__cpg_host_s __attribute__((packed))
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
Definition: crm.h:76
uint32_t compressed_size
Definition: cpg.c:53
#define CRM_SYSTEM_CRMD
Definition: crm.h:106
const char * target
Definition: pcmk_fence.c:29
crm_ais_msg_class
Definition: cluster.h:99
#define CRM_XS
Definition: logging.h:54
#define CRM_SYSTEM_STONITHD
Definition: crm.h:110
#define CRM_SYSTEM_CIB
Definition: crm.h:105
#define CRM_SYSTEM_TENGINE
Definition: crm.h:109
gboolean is_compressed
Definition: cpg.c:47
uint32_t get_local_nodeid(cpg_handle_t handle)
Get the local Corosync node ID (via CPG)
Definition: cpg.c:117
#define crm_err(fmt, args...)
Definition: logging.h:350
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:181
#define CRM_ASSERT(expr)
Definition: results.h:42
#define OFFLINESTATUS
Definition: util.h:39
enum crm_ais_msg_types text2msg_type(const char *text)
Get the message type equivalent of a string.
Definition: cpg.c:1039
#define CRM_BZ2_THRESHOLD
Definition: xml.h:47
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:2017
#define CRM_SYSTEM_LRMD
Definition: crm.h:107
char uname[MAX_NAME]
Definition: cpg.c:50
char * state
Definition: cluster.h:55
#define msg_data_len(msg)
Definition: cpg.c:76
IPC interface to Pacemaker daemons.
char * uname
Definition: cluster.h:53
#define ONLINESTATUS
Definition: util.h:38
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Handle a CPG configuration change event.
Definition: cpg.c:629
#define crm_info(fmt, args...)
Definition: logging.h:353
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Connect to Corosync CPG.
Definition: cpg.c:771
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
Definition: ipc_client.c:1382