pacemaker  2.0.5-ba59be712
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2020 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipcc.h>
23 #include <qb/qbutil.h>
24 
25 #include <corosync/corodefs.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/hdb.h>
28 #include <corosync/cpg.h>
29 
30 #include <crm/msg_xml.h>
31 
32 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
33 
34 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
35 
36 static bool cpg_evicted = FALSE;
37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
38 
39 #define cs_repeat(counter, max, code) do { \
40  code; \
41  if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
42  counter++; \
43  crm_debug("Retrying operation after %ds", counter); \
44  sleep(counter); \
45  } else { \
46  break; \
47  } \
48  } while(counter < max)
49 
50 void
52 {
53  pcmk_cpg_handle = 0;
54  if (cluster->cpg_handle) {
55  crm_trace("Disconnecting CPG");
56  cpg_leave(cluster->cpg_handle, &cluster->group);
57  cpg_finalize(cluster->cpg_handle);
58  cluster->cpg_handle = 0;
59 
60  } else {
61  crm_info("No CPG connection");
62  }
63 }
64 
65 uint32_t get_local_nodeid(cpg_handle_t handle)
66 {
67  cs_error_t rc = CS_OK;
68  int retries = 0;
69  static uint32_t local_nodeid = 0;
70  cpg_handle_t local_handle = handle;
71  cpg_callbacks_t cb = { };
72  int fd = -1;
73  uid_t found_uid = 0;
74  gid_t found_gid = 0;
75  pid_t found_pid = 0;
76  int rv;
77 
78  if(local_nodeid != 0) {
79  return local_nodeid;
80  }
81 
82  if(handle == 0) {
83  crm_trace("Creating connection");
84  cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
85  if (rc != CS_OK) {
86  crm_err("Could not connect to the CPG API: %s (%d)",
87  cs_strerror(rc), rc);
88  return 0;
89  }
90 
91  rc = cpg_fd_get(local_handle, &fd);
92  if (rc != CS_OK) {
93  crm_err("Could not obtain the CPG API connection: %s (%d)",
94  cs_strerror(rc), rc);
95  goto bail;
96  }
97 
98  /* CPG provider run as root (in given user namespace, anyway)? */
99  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
100  &found_uid, &found_gid))) {
101  crm_err("CPG provider is not authentic:"
102  " process %lld (uid: %lld, gid: %lld)",
103  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
104  (long long) found_uid, (long long) found_gid);
105  goto bail;
106  } else if (rv < 0) {
107  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
108  strerror(-rv), -rv);
109  goto bail;
110  }
111  }
112 
113  if (rc == CS_OK) {
114  retries = 0;
115  crm_trace("Performing lookup");
116  cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
117  }
118 
119  if (rc != CS_OK) {
120  crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
121  }
122 
123 bail:
124  if(handle == 0) {
125  crm_trace("Closing connection");
126  cpg_finalize(local_handle);
127  }
128  crm_debug("Local nodeid is %u", local_nodeid);
129  return local_nodeid;
130 }
131 
132 
135 
136 static ssize_t crm_cs_flush(gpointer data);
137 
138 static gboolean
139 crm_cs_flush_cb(gpointer data)
140 {
141  cs_message_timer = 0;
142  crm_cs_flush(data);
143  return FALSE;
144 }
145 
146 #define CS_SEND_MAX 200
147 static ssize_t
148 crm_cs_flush(gpointer data)
149 {
150  int sent = 0;
151  ssize_t rc = 0;
152  int queue_len = 0;
153  static unsigned int last_sent = 0;
154  cpg_handle_t *handle = (cpg_handle_t *)data;
155 
156  if (*handle == 0) {
157  crm_trace("Connection is dead");
158  return pcmk_ok;
159  }
160 
161  queue_len = g_list_length(cs_message_queue);
162  if ((queue_len % 1000) == 0 && queue_len > 1) {
163  crm_err("CPG queue has grown to %d", queue_len);
164 
165  } else if (queue_len == CS_SEND_MAX) {
166  crm_warn("CPG queue has grown to %d", queue_len);
167  }
168 
169  if (cs_message_timer) {
170  /* There is already a timer, wait until it goes off */
171  crm_trace("Timer active %d", cs_message_timer);
172  return pcmk_ok;
173  }
174 
175  while (cs_message_queue && sent < CS_SEND_MAX) {
176  struct iovec *iov = cs_message_queue->data;
177 
178  errno = 0;
179  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
180 
181  if (rc != CS_OK) {
182  break;
183  }
184 
185  sent++;
186  last_sent++;
187  crm_trace("CPG message sent, size=%llu",
188  (unsigned long long) iov->iov_len);
189 
190  cs_message_queue = g_list_remove(cs_message_queue, iov);
191  free(iov->iov_base);
192  free(iov);
193  }
194 
195  queue_len -= sent;
196  if (sent > 1 || cs_message_queue) {
197  crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
198  sent, queue_len, last_sent, ais_error2text(rc),
199  (long long) rc);
200  } else {
201  crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
202  sent, queue_len, last_sent, ais_error2text(rc),
203  (long long) rc);
204  }
205 
206  if (cs_message_queue) {
207  uint32_t delay_ms = 100;
208  if(rc != CS_OK) {
209  /* Proportionally more if sending failed but cap at 1s */
210  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
211  }
212  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
213  }
214 
215  return rc;
216 }
217 
218 gboolean
219 send_cpg_iov(struct iovec * iov)
220 {
221  static unsigned int queued = 0;
222 
223  queued++;
224  crm_trace("Queueing CPG message %u (%llu bytes)",
225  queued, (unsigned long long) iov->iov_len);
226  cs_message_queue = g_list_append(cs_message_queue, iov);
227  crm_cs_flush(&pcmk_cpg_handle);
228  return TRUE;
229 }
230 
231 static int
232 pcmk_cpg_dispatch(gpointer user_data)
233 {
234  int rc = 0;
235  crm_cluster_t *cluster = (crm_cluster_t*) user_data;
236 
237  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
238  if (rc != CS_OK) {
239  crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
240  cpg_finalize(cluster->cpg_handle);
241  cluster->cpg_handle = 0;
242  return -1;
243 
244  } else if(cpg_evicted) {
245  crm_err("Evicted from CPG membership");
246  return -1;
247  }
248  return 0;
249 }
250 
251 char *
252 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
253  uint32_t *kind, const char **from)
254 {
255  char *data = NULL;
256  AIS_Message *msg = (AIS_Message *) content;
257 
258  if(handle) {
259  // Do filtering and field massaging
260  uint32_t local_nodeid = get_local_nodeid(handle);
261  const char *local_name = get_local_node_name();
262 
263  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
264  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
265  return NULL;
266 
267  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
268  /* Not for us */
269  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
270  return NULL;
271  } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
272  /* Not for us */
273  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
274  return NULL;
275  }
276 
277  msg->sender.id = nodeid;
278  if (msg->sender.size == 0) {
279  crm_node_t *peer = crm_get_peer(nodeid, NULL);
280 
281  if (peer == NULL) {
282  crm_err("Peer with nodeid=%u is unknown", nodeid);
283 
284  } else if (peer->uname == NULL) {
285  crm_err("No uname for peer with nodeid=%u", nodeid);
286 
287  } else {
288  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
289  msg->sender.size = strlen(peer->uname);
290  memset(msg->sender.uname, 0, MAX_NAME);
291  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
292  }
293  }
294  }
295 
296  crm_trace("Got new%s message (size=%d, %d, %d)",
297  msg->is_compressed ? " compressed" : "",
298  ais_data_len(msg), msg->size, msg->compressed_size);
299 
300  if (kind != NULL) {
301  *kind = msg->header.id;
302  }
303  if (from != NULL) {
304  *from = msg->sender.uname;
305  }
306 
307  if (msg->is_compressed && msg->size > 0) {
308  int rc = BZ_OK;
309  char *uncompressed = NULL;
310  unsigned int new_size = msg->size + 1;
311 
312  if (check_message_sanity(msg, NULL) == FALSE) {
313  goto badmsg;
314  }
315 
316  crm_trace("Decompressing message data");
317  uncompressed = calloc(1, new_size);
318  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
319 
320  if (rc != BZ_OK) {
321  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
322  bz2_strerror(rc), rc);
323  free(uncompressed);
324  goto badmsg;
325  }
326 
327  CRM_ASSERT(rc == BZ_OK);
328  CRM_ASSERT(new_size == msg->size);
329 
330  data = uncompressed;
331 
332  } else if (check_message_sanity(msg, data) == FALSE) {
333  goto badmsg;
334 
335  } else if (pcmk__str_eq("identify", data, pcmk__str_casei)) {
336  char *pid_s = pcmk__getpid_s();
337 
338  send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
339  free(pid_s);
340  return NULL;
341 
342  } else {
343  data = strdup(msg->data);
344  }
345 
346  // Is this necessary?
347  crm_get_peer(msg->sender.id, msg->sender.uname);
348 
349  crm_trace("Payload: %.200s", data);
350  return data;
351 
352  badmsg:
353  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
354  " min=%d, total=%d, size=%d, bz2_size=%d",
355  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
356  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
357  msg->sender.pid, (int)sizeof(AIS_Message),
358  msg->header.size, msg->size, msg->compressed_size);
359 
360  free(data);
361  return NULL;
362 }
363 
364 static int cmp_member_list_nodeid(const void *first,
365  const void *second)
366 {
367  const struct cpg_address *const a = *((const struct cpg_address **) first),
368  *const b = *((const struct cpg_address **) second);
369  if (a->nodeid < b->nodeid) {
370  return -1;
371  } else if (a->nodeid > b->nodeid) {
372  return 1;
373  }
374  /* don't bother with "reason" nor "pid" */
375  return 0;
376 }
377 
378 static const char *
379 cpgreason2str(cpg_reason_t reason)
380 {
381  switch (reason) {
382  case CPG_REASON_JOIN: return " via cpg_join";
383  case CPG_REASON_LEAVE: return " via cpg_leave";
384  case CPG_REASON_NODEDOWN: return " via cluster exit";
385  case CPG_REASON_NODEUP: return " via cluster join";
386  case CPG_REASON_PROCDOWN: return " for unknown reason";
387  default: break;
388  }
389  return "";
390 }
391 
392 static inline const char *
393 peer_name(crm_node_t *peer)
394 {
395  if (peer == NULL) {
396  return "unknown node";
397  } else if (peer->uname == NULL) {
398  return "peer node";
399  } else {
400  return peer->uname;
401  }
402 }
403 
404 void
405 pcmk_cpg_membership(cpg_handle_t handle,
406  const struct cpg_name *groupName,
407  const struct cpg_address *member_list, size_t member_list_entries,
408  const struct cpg_address *left_list, size_t left_list_entries,
409  const struct cpg_address *joined_list, size_t joined_list_entries)
410 {
411  int i;
412  gboolean found = FALSE;
413  static int counter = 0;
414  uint32_t local_nodeid = get_local_nodeid(handle);
415  const struct cpg_address *key, **sorted;
416 
417  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
418  CRM_ASSERT(sorted != NULL);
419 
420  for (size_t iter = 0; iter < member_list_entries; iter++) {
421  sorted[iter] = member_list + iter;
422  }
423  /* so that the cross-matching multiply-subscribed nodes is then cheap */
424  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
425  cmp_member_list_nodeid);
426 
427  for (i = 0; i < left_list_entries; i++) {
428  crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
429  const struct cpg_address **rival = NULL;
430 
431  /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
432  and not playing by this rule may go wild in case of multiple
433  residual instances of the same pacemaker daemon at the same node
434  -- we must ensure that the possible local rival(s) won't make us
435  cry out and bail (e.g. when they quit themselves), since all the
436  surrounding logic denies this simple fact that the full membership
437  is discriminated also per the PID of the process beside mere node
438  ID (and implicitly, group ID); practically, this will be sound in
439  terms of not preventing progress, since all the CPG joiners are
440  also API end-point carriers, and that's what matters locally
441  (who's the winner);
442  remotely, we will just compare leave_list and member_list and if
443  the left process has its node retained in member_list (under some
444  other PID, anyway) we will just ignore it as well
445  XXX: long-term fix is to establish in-out PID-aware tracking? */
446  if (peer) {
447  key = &left_list[i];
448  rival = bsearch(&key, sorted, member_list_entries,
449  sizeof(const struct cpg_address *),
450  cmp_member_list_nodeid);
451  }
452 
453  if (rival == NULL) {
454  crm_info("Group %s event %d: %s (node %u pid %u) left%s",
455  groupName->value, counter, peer_name(peer),
456  left_list[i].nodeid, left_list[i].pid,
457  cpgreason2str(left_list[i].reason));
458  if (peer) {
459  crm_update_peer_proc(__func__, peer, crm_proc_cpg,
460  OFFLINESTATUS);
461  }
462  } else if (left_list[i].nodeid == local_nodeid) {
463  crm_warn("Group %s event %d: duplicate local pid %u left%s",
464  groupName->value, counter,
465  left_list[i].pid, cpgreason2str(left_list[i].reason));
466  } else {
467  crm_warn("Group %s event %d: "
468  "%s (node %u) duplicate pid %u left%s (%u remains)",
469  groupName->value, counter, peer_name(peer),
470  left_list[i].nodeid, left_list[i].pid,
471  cpgreason2str(left_list[i].reason), (*rival)->pid);
472  }
473  }
474  free(sorted);
475  sorted = NULL;
476 
477  for (i = 0; i < joined_list_entries; i++) {
478  crm_info("Group %s event %d: node %u pid %u joined%s",
479  groupName->value, counter, joined_list[i].nodeid,
480  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
481  }
482 
483  for (i = 0; i < member_list_entries; i++) {
484  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
485 
486  if (member_list[i].nodeid == local_nodeid
487  && member_list[i].pid != getpid()) {
488  /* see the note above */
489  crm_warn("Group %s event %d: detected duplicate local pid %u",
490  groupName->value, counter, member_list[i].pid);
491  continue;
492  }
493  crm_info("Group %s event %d: %s (node %u pid %u) is member",
494  groupName->value, counter, peer_name(peer),
495  member_list[i].nodeid, member_list[i].pid);
496 
497  /* If the caller left auto-reaping enabled, this will also update the
498  * state to member.
499  */
500  peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
501  ONLINESTATUS);
502 
503  if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
504  /* The node is a CPG member, but we currently think it's not a
505  * cluster member. This is possible only if auto-reaping was
506  * disabled. The node may be joining, and we happened to get the CPG
507  * notification before the quorum notification; or the node may have
508  * just died, and we are processing its final messages; or a bug
509  * has affected the peer cache.
510  */
511  time_t now = time(NULL);
512 
513  if (peer->when_lost == 0) {
514  // Track when we first got into this contradictory state
515  peer->when_lost = now;
516 
517  } else if (now > (peer->when_lost + 60)) {
518  // If it persists for more than a minute, update the state
519  crm_warn("Node %u is member of group %s but was believed offline",
520  member_list[i].nodeid, groupName->value);
521  crm_update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
522  }
523  }
524 
525  if (local_nodeid == member_list[i].nodeid) {
526  found = TRUE;
527  }
528  }
529 
530  if (!found) {
531  crm_err("Local node was evicted from group %s", groupName->value);
532  cpg_evicted = TRUE;
533  }
534 
535  counter++;
536 }
537 
538 gboolean
540 {
541  cs_error_t rc;
542  int fd = -1;
543  int retries = 0;
544  uint32_t id = 0;
545  crm_node_t *peer = NULL;
546  cpg_handle_t handle = 0;
547  const char *message_name = pcmk__message_name(crm_system_name);
548  uid_t found_uid = 0;
549  gid_t found_gid = 0;
550  pid_t found_pid = 0;
551  int rv;
552 
553  struct mainloop_fd_callbacks cpg_fd_callbacks = {
554  .dispatch = pcmk_cpg_dispatch,
555  .destroy = cluster->destroy,
556  };
557 
558  cpg_callbacks_t cpg_callbacks = {
559  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
560  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
561  /* .cpg_deliver_fn = pcmk_cpg_deliver, */
562  /* .cpg_confchg_fn = pcmk_cpg_membership, */
563  };
564 
565  cpg_evicted = FALSE;
566  cluster->group.length = 0;
567  cluster->group.value[0] = 0;
568 
569  /* group.value is char[128] */
570  strncpy(cluster->group.value, message_name, 127);
571  cluster->group.value[127] = 0;
572  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
573 
574  cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
575  if (rc != CS_OK) {
576  crm_err("Could not connect to the CPG API: %s (%d)",
577  cs_strerror(rc), rc);
578  goto bail;
579  }
580 
581  rc = cpg_fd_get(handle, &fd);
582  if (rc != CS_OK) {
583  crm_err("Could not obtain the CPG API connection: %s (%d)",
584  cs_strerror(rc), rc);
585  goto bail;
586  }
587 
588  /* CPG provider run as root (in given user namespace, anyway)? */
589  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
590  &found_uid, &found_gid))) {
591  crm_err("CPG provider is not authentic:"
592  " process %lld (uid: %lld, gid: %lld)",
593  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
594  (long long) found_uid, (long long) found_gid);
595  rc = CS_ERR_ACCESS;
596  goto bail;
597  } else if (rv < 0) {
598  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
599  strerror(-rv), -rv);
600  rc = CS_ERR_ACCESS;
601  goto bail;
602  }
603 
604  id = get_local_nodeid(handle);
605  if (id == 0) {
606  crm_err("Could not get local node id from the CPG API");
607  goto bail;
608 
609  }
610  cluster->nodeid = id;
611 
612  retries = 0;
613  cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
614  if (rc != CS_OK) {
615  crm_err("Could not join the CPG group '%s': %d", message_name, rc);
616  goto bail;
617  }
618 
619  pcmk_cpg_handle = handle;
620  cluster->cpg_handle = handle;
621  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
622 
623  bail:
624  if (rc != CS_OK) {
625  cpg_finalize(handle);
626  return FALSE;
627  }
628 
629  peer = crm_get_peer(id, NULL);
631  return TRUE;
632 }
633 
634 gboolean
635 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
636 {
637  gboolean rc = TRUE;
638  char *data = NULL;
639 
640  data = dump_xml_unformatted(msg);
642  free(data);
643  return rc;
644 }
645 
646 gboolean
647 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
648  gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
649 {
650  static int msg_id = 0;
651  static int local_pid = 0;
652  static int local_name_len = 0;
653  static const char *local_name = NULL;
654 
655  char *target = NULL;
656  struct iovec *iov;
657  AIS_Message *msg = NULL;
659 
660  switch (msg_class) {
661  case crm_class_cluster:
662  break;
663  default:
664  crm_err("Invalid message class: %d", msg_class);
665  return FALSE;
666  }
667 
668  CRM_CHECK(dest != crm_msg_ais, return FALSE);
669 
670  if(local_name == NULL) {
671  local_name = get_local_node_name();
672  }
673  if(local_name_len == 0 && local_name) {
674  local_name_len = strlen(local_name);
675  }
676 
677  if (data == NULL) {
678  data = "";
679  }
680 
681  if (local_pid == 0) {
682  local_pid = getpid();
683  }
684 
685  if (sender == crm_msg_none) {
686  sender = local_pid;
687  }
688 
689  msg = calloc(1, sizeof(AIS_Message));
690 
691  msg_id++;
692  msg->id = msg_id;
693  msg->header.id = msg_class;
694  msg->header.error = CS_OK;
695 
696  msg->host.type = dest;
697  msg->host.local = local;
698 
699  if (node) {
700  if (node->uname) {
701  target = strdup(node->uname);
702  msg->host.size = strlen(node->uname);
703  memset(msg->host.uname, 0, MAX_NAME);
704  memcpy(msg->host.uname, node->uname, msg->host.size);
705  } else {
706  target = crm_strdup_printf("%u", node->id);
707  }
708  msg->host.id = node->id;
709  } else {
710  target = strdup("all");
711  }
712 
713  msg->sender.id = 0;
714  msg->sender.type = sender;
715  msg->sender.pid = local_pid;
716  msg->sender.size = local_name_len;
717  memset(msg->sender.uname, 0, MAX_NAME);
718  if(local_name && msg->sender.size) {
719  memcpy(msg->sender.uname, local_name, msg->sender.size);
720  }
721 
722  msg->size = 1 + strlen(data);
723  msg->header.size = sizeof(AIS_Message) + msg->size;
724 
725  if (msg->size < CRM_BZ2_THRESHOLD) {
726  msg = pcmk__realloc(msg, msg->header.size);
727  memcpy(msg->data, data, msg->size);
728 
729  } else {
730  char *compressed = NULL;
731  unsigned int new_size = 0;
732  char *uncompressed = strdup(data);
733 
734  if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
735  &compressed, &new_size) == pcmk_rc_ok) {
736 
737  msg->header.size = sizeof(AIS_Message) + new_size;
738  msg = pcmk__realloc(msg, msg->header.size);
739  memcpy(msg->data, compressed, new_size);
740 
741  msg->is_compressed = TRUE;
742  msg->compressed_size = new_size;
743 
744  } else {
745  // cppcheck seems not to understand the abort logic in pcmk__realloc
746  // cppcheck-suppress memleak
747  msg = pcmk__realloc(msg, msg->header.size);
748  memcpy(msg->data, data, msg->size);
749  }
750 
751  free(uncompressed);
752  free(compressed);
753  }
754 
755  iov = calloc(1, sizeof(struct iovec));
756  iov->iov_base = msg;
757  iov->iov_len = msg->header.size;
758 
759  if (msg->compressed_size) {
760  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
761  msg->id, target, (unsigned long long) iov->iov_len,
762  msg->compressed_size, data);
763  } else {
764  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
765  msg->id, target, (unsigned long long) iov->iov_len,
766  msg->size, data);
767  }
768  free(target);
769 
770  send_cpg_iov(iov);
771 
772  return TRUE;
773 }
774 
776 text2msg_type(const char *text)
777 {
778  int type = crm_msg_none;
779 
780  CRM_CHECK(text != NULL, return type);
781  text = pcmk__message_name(text);
782  if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
783  type = crm_msg_ais;
784  } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
785  type = crm_msg_cib;
786  } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
787  type = crm_msg_crmd;
788  } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
789  type = crm_msg_te;
790  } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
791  type = crm_msg_pe;
792  } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
793  type = crm_msg_lrmd;
794  } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
796  } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
798  } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
800 
801  } else {
802  /* This will normally be a transient client rather than
803  * a cluster daemon. Set the type to the pid of the client
804  */
805  int scan_rc = sscanf(text, "%d", &type);
806 
807  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
808  /* Ensure it's sane */
809  type = crm_msg_none;
810  }
811  }
812  return type;
813 }
enum crm_ais_msg_types type
Definition: internal.h:37
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:215
char data[0]
Definition: internal.h:66
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: messages.c:182
gboolean send_cpg_iov(struct iovec *iov)
Definition: cpg.c:219
#define crm_notice(fmt, args...)
Definition: logging.h:349
gboolean is_compressed
Definition: internal.h:58
const char * bz2_strerror(int rc)
Definition: results.c:726
uint32_t size
Definition: internal.h:63
crm_ais_msg_types
Definition: cluster.h:99
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:915
uint32_t nodeid
Definition: cluster.h:79
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
Definition: strings.c:842
uint32_t id
Definition: cluster.h:65
const char * get_local_node_name(void)
Definition: cluster.c:120
void(* destroy)(gpointer)
Definition: cluster.h:81
uint32_t id
Definition: internal.h:34
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:50
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Definition: membership.c:654
char * crm_system_name
Definition: utils.c:54
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
Definition: membership.c:978
#define CS_SEND_MAX
Definition: cpg.c:146
uint32_t pid
Definition: internal.h:81
char * strerror(int errnum)
AIS_Host sender
Definition: internal.h:85
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:647
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:252
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:98
uint32_t id
Definition: internal.h:80
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:51
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:115
int cs_message_timer
Definition: cpg.c:134
#define crm_warn(fmt, args...)
Definition: logging.h:348
int rc
Definition: pcmk_fence.c:35
#define crm_debug(fmt, args...)
Definition: logging.h:352
time_t when_lost
Definition: cluster.h:66
GListPtr cs_message_queue
Definition: cpg.c:133
#define crm_trace(fmt, args...)
Definition: logging.h:353
gboolean local
Definition: internal.h:36
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:786
#define CRM_SYSTEM_PENGINE
Definition: crm.h:104
AIS_Host sender
Definition: internal.h:61
uint32_t id
Definition: internal.h:57
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
Definition: corosync.c:421
struct crm_ais_msg_s AIS_Message
Definition: internal.h:31
cpg_handle_t pcmk_cpg_handle
Definition: cpg.c:34
#define ais_data_len(msg)
Definition: internal.h:152
uint32_t size
Definition: internal.h:38
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition: strings.c:663
#define CRM_NODE_MEMBER
Definition: cluster.h:34
uint32_t compressed_size
Definition: internal.h:64
#define MAX_NAME
Definition: crm.h:60
#define CRM_SYSTEM_CRMD
Definition: crm.h:102
const char * target
Definition: pcmk_fence.c:29
crm_ais_msg_class
Definition: cluster.h:95
#define CRM_XS
Definition: logging.h:54
#define CRM_SYSTEM_STONITHD
Definition: crm.h:106
#define CRM_SYSTEM_CIB
Definition: crm.h:101
#define CRM_SYSTEM_TENGINE
Definition: crm.h:105
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:65
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
Definition: cpg.c:37
#define crm_err(fmt, args...)
Definition: logging.h:347
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:153
#define CRM_ASSERT(expr)
Definition: results.h:42
char uname[MAX_NAME]
Definition: internal.h:39
#define OFFLINESTATUS
Definition: util.h:38
enum crm_ais_msg_types text2msg_type(const char *text)
Definition: cpg.c:776
#define CRM_BZ2_THRESHOLD
Definition: xml.h:47
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:2000
#define CRM_SYSTEM_LRMD
Definition: crm.h:103
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:635
char data[0]
Definition: internal.h:90
char * state
Definition: cluster.h:59
#define pcmk_ok
Definition: results.h:67
IPC interface to Pacemaker daemons.
uint32_t pid
Definition: internal.h:35
char * uname
Definition: cluster.h:57
#define cs_repeat(counter, max, code)
Definition: cpg.c:39
AIS_Host host
Definition: internal.h:60
#define ONLINESTATUS
Definition: util.h:37
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
Definition: membership.c:522
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:405
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
GList * GListPtr
Definition: crm.h:214
#define crm_info(fmt, args...)
Definition: logging.h:350
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:539
enum crm_ais_msg_types type
Definition: internal.h:83
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process.
Definition: ipc_client.c:1294
gboolean local
Definition: internal.h:82