pacemaker  2.0.4-2deceaa
Scalable High-Availability cluster resource manager
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2020 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipcc.h>
23 #include <qb/qbutil.h>
24 
25 #include <corosync/corodefs.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/hdb.h>
28 #include <corosync/cpg.h>
29 
30 #include <crm/msg_xml.h>
31 
32 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
33 
34 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
35 
36 static bool cpg_evicted = FALSE;
37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
38 
39 #define cs_repeat(counter, max, code) do { \
40  code; \
41  if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
42  counter++; \
43  crm_debug("Retrying operation after %ds", counter); \
44  sleep(counter); \
45  } else { \
46  break; \
47  } \
48  } while(counter < max)
49 
50 void
52 {
53  pcmk_cpg_handle = 0;
54  if (cluster->cpg_handle) {
55  crm_trace("Disconnecting CPG");
56  cpg_leave(cluster->cpg_handle, &cluster->group);
57  cpg_finalize(cluster->cpg_handle);
58  cluster->cpg_handle = 0;
59 
60  } else {
61  crm_info("No CPG connection");
62  }
63 }
64 
65 uint32_t get_local_nodeid(cpg_handle_t handle)
66 {
67  cs_error_t rc = CS_OK;
68  int retries = 0;
69  static uint32_t local_nodeid = 0;
70  cpg_handle_t local_handle = handle;
71  cpg_callbacks_t cb = { };
72  int fd = -1;
73  uid_t found_uid = 0;
74  gid_t found_gid = 0;
75  pid_t found_pid = 0;
76  int rv;
77 
78  if(local_nodeid != 0) {
79  return local_nodeid;
80  }
81 
82  if(handle == 0) {
83  crm_trace("Creating connection");
84  cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
85  if (rc != CS_OK) {
86  crm_err("Could not connect to the CPG API: %s (%d)",
87  cs_strerror(rc), rc);
88  return 0;
89  }
90 
91  rc = cpg_fd_get(local_handle, &fd);
92  if (rc != CS_OK) {
93  crm_err("Could not obtain the CPG API connection: %s (%d)",
94  cs_strerror(rc), rc);
95  goto bail;
96  }
97 
98  /* CPG provider run as root (in given user namespace, anyway)? */
99  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
100  &found_uid, &found_gid))) {
101  crm_err("CPG provider is not authentic:"
102  " process %lld (uid: %lld, gid: %lld)",
103  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
104  (long long) found_uid, (long long) found_gid);
105  goto bail;
106  } else if (rv < 0) {
107  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
108  strerror(-rv), -rv);
109  goto bail;
110  }
111  }
112 
113  if (rc == CS_OK) {
114  retries = 0;
115  crm_trace("Performing lookup");
116  cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
117  }
118 
119  if (rc != CS_OK) {
120  crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
121  }
122 
123 bail:
124  if(handle == 0) {
125  crm_trace("Closing connection");
126  cpg_finalize(local_handle);
127  }
128  crm_debug("Local nodeid is %u", local_nodeid);
129  return local_nodeid;
130 }
131 
132 
135 
136 static ssize_t crm_cs_flush(gpointer data);
137 
138 static gboolean
139 crm_cs_flush_cb(gpointer data)
140 {
141  cs_message_timer = 0;
142  crm_cs_flush(data);
143  return FALSE;
144 }
145 
146 #define CS_SEND_MAX 200
147 static ssize_t
148 crm_cs_flush(gpointer data)
149 {
150  int sent = 0;
151  ssize_t rc = 0;
152  int queue_len = 0;
153  static unsigned int last_sent = 0;
154  cpg_handle_t *handle = (cpg_handle_t *)data;
155 
156  if (*handle == 0) {
157  crm_trace("Connection is dead");
158  return pcmk_ok;
159  }
160 
161  queue_len = g_list_length(cs_message_queue);
162  if ((queue_len % 1000) == 0 && queue_len > 1) {
163  crm_err("CPG queue has grown to %d", queue_len);
164 
165  } else if (queue_len == CS_SEND_MAX) {
166  crm_warn("CPG queue has grown to %d", queue_len);
167  }
168 
169  if (cs_message_timer) {
170  /* There is already a timer, wait until it goes off */
171  crm_trace("Timer active %d", cs_message_timer);
172  return pcmk_ok;
173  }
174 
175  while (cs_message_queue && sent < CS_SEND_MAX) {
176  struct iovec *iov = cs_message_queue->data;
177 
178  errno = 0;
179  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
180 
181  if (rc != CS_OK) {
182  break;
183  }
184 
185  sent++;
186  last_sent++;
187  crm_trace("CPG message sent, size=%llu",
188  (unsigned long long) iov->iov_len);
189 
190  cs_message_queue = g_list_remove(cs_message_queue, iov);
191  free(iov->iov_base);
192  free(iov);
193  }
194 
195  queue_len -= sent;
196  if (sent > 1 || cs_message_queue) {
197  crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
198  sent, queue_len, last_sent, ais_error2text(rc),
199  (long long) rc);
200  } else {
201  crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
202  sent, queue_len, last_sent, ais_error2text(rc),
203  (long long) rc);
204  }
205 
206  if (cs_message_queue) {
207  uint32_t delay_ms = 100;
208  if(rc != CS_OK) {
209  /* Proportionally more if sending failed but cap at 1s */
210  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
211  }
212  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
213  }
214 
215  return rc;
216 }
217 
218 gboolean
219 send_cpg_iov(struct iovec * iov)
220 {
221  static unsigned int queued = 0;
222 
223  queued++;
224  crm_trace("Queueing CPG message %u (%llu bytes)",
225  queued, (unsigned long long) iov->iov_len);
226  cs_message_queue = g_list_append(cs_message_queue, iov);
227  crm_cs_flush(&pcmk_cpg_handle);
228  return TRUE;
229 }
230 
231 static int
232 pcmk_cpg_dispatch(gpointer user_data)
233 {
234  int rc = 0;
235  crm_cluster_t *cluster = (crm_cluster_t*) user_data;
236 
237  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
238  if (rc != CS_OK) {
239  crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
240  cluster->cpg_handle = 0;
241  return -1;
242 
243  } else if(cpg_evicted) {
244  crm_err("Evicted from CPG membership");
245  return -1;
246  }
247  return 0;
248 }
249 
250 char *
251 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
252  uint32_t *kind, const char **from)
253 {
254  char *data = NULL;
255  AIS_Message *msg = (AIS_Message *) content;
256 
257  if(handle) {
258  // Do filtering and field massaging
259  uint32_t local_nodeid = get_local_nodeid(handle);
260  const char *local_name = get_local_node_name();
261 
262  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
263  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
264  return NULL;
265 
266  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
267  /* Not for us */
268  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
269  return NULL;
270  } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
271  /* Not for us */
272  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
273  return NULL;
274  }
275 
276  msg->sender.id = nodeid;
277  if (msg->sender.size == 0) {
278  crm_node_t *peer = crm_get_peer(nodeid, NULL);
279 
280  if (peer == NULL) {
281  crm_err("Peer with nodeid=%u is unknown", nodeid);
282 
283  } else if (peer->uname == NULL) {
284  crm_err("No uname for peer with nodeid=%u", nodeid);
285 
286  } else {
287  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
288  msg->sender.size = strlen(peer->uname);
289  memset(msg->sender.uname, 0, MAX_NAME);
290  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
291  }
292  }
293  }
294 
295  crm_trace("Got new%s message (size=%d, %d, %d)",
296  msg->is_compressed ? " compressed" : "",
297  ais_data_len(msg), msg->size, msg->compressed_size);
298 
299  if (kind != NULL) {
300  *kind = msg->header.id;
301  }
302  if (from != NULL) {
303  *from = msg->sender.uname;
304  }
305 
306  if (msg->is_compressed && msg->size > 0) {
307  int rc = BZ_OK;
308  char *uncompressed = NULL;
309  unsigned int new_size = msg->size + 1;
310 
311  if (check_message_sanity(msg, NULL) == FALSE) {
312  goto badmsg;
313  }
314 
315  crm_trace("Decompressing message data");
316  uncompressed = calloc(1, new_size);
317  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
318 
319  if (rc != BZ_OK) {
320  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
321  bz2_strerror(rc), rc);
322  free(uncompressed);
323  goto badmsg;
324  }
325 
326  CRM_ASSERT(rc == BZ_OK);
327  CRM_ASSERT(new_size == msg->size);
328 
329  data = uncompressed;
330 
331  } else if (check_message_sanity(msg, data) == FALSE) {
332  goto badmsg;
333 
334  } else if (safe_str_eq("identify", data)) {
335  char *pid_s = pcmk__getpid_s();
336 
337  send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
338  free(pid_s);
339  return NULL;
340 
341  } else {
342  data = strdup(msg->data);
343  }
344 
345  // Is this necessary?
346  crm_get_peer(msg->sender.id, msg->sender.uname);
347 
348  crm_trace("Payload: %.200s", data);
349  return data;
350 
351  badmsg:
352  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
353  " min=%d, total=%d, size=%d, bz2_size=%d",
354  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
355  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
356  msg->sender.pid, (int)sizeof(AIS_Message),
357  msg->header.size, msg->size, msg->compressed_size);
358 
359  free(data);
360  return NULL;
361 }
362 
363 static int cmp_member_list_nodeid(const void *first,
364  const void *second)
365 {
366  const struct cpg_address *const a = *((const struct cpg_address **) first),
367  *const b = *((const struct cpg_address **) second);
368  if (a->nodeid < b->nodeid) {
369  return -1;
370  } else if (a->nodeid > b->nodeid) {
371  return 1;
372  }
373  /* don't bother with "reason" nor "pid" */
374  return 0;
375 }
376 
377 static const char *
378 cpgreason2str(cpg_reason_t reason)
379 {
380  switch (reason) {
381  case CPG_REASON_JOIN: return " via cpg_join";
382  case CPG_REASON_LEAVE: return " via cpg_leave";
383  case CPG_REASON_NODEDOWN: return " via cluster exit";
384  case CPG_REASON_NODEUP: return " via cluster join";
385  case CPG_REASON_PROCDOWN: return " for unknown reason";
386  default: break;
387  }
388  return "";
389 }
390 
391 static inline const char *
392 peer_name(crm_node_t *peer)
393 {
394  if (peer == NULL) {
395  return "unknown node";
396  } else if (peer->uname == NULL) {
397  return "peer node";
398  } else {
399  return peer->uname;
400  }
401 }
402 
403 void
404 pcmk_cpg_membership(cpg_handle_t handle,
405  const struct cpg_name *groupName,
406  const struct cpg_address *member_list, size_t member_list_entries,
407  const struct cpg_address *left_list, size_t left_list_entries,
408  const struct cpg_address *joined_list, size_t joined_list_entries)
409 {
410  int i;
411  gboolean found = FALSE;
412  static int counter = 0;
413  uint32_t local_nodeid = get_local_nodeid(handle);
414  const struct cpg_address *key, **sorted;
415 
416  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
417  CRM_ASSERT(sorted != NULL);
418 
419  for (size_t iter = 0; iter < member_list_entries; iter++) {
420  sorted[iter] = member_list + iter;
421  }
422  /* so that the cross-matching multiply-subscribed nodes is then cheap */
423  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
424  cmp_member_list_nodeid);
425 
426  for (i = 0; i < left_list_entries; i++) {
427  crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
428  const struct cpg_address **rival = NULL;
429 
430  /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
431  and not playing by this rule may go wild in case of multiple
432  residual instances of the same pacemaker daemon at the same node
433  -- we must ensure that the possible local rival(s) won't make us
434  cry out and bail (e.g. when they quit themselves), since all the
435  surrounding logic denies this simple fact that the full membership
436  is discriminated also per the PID of the process beside mere node
437  ID (and implicitly, group ID); practically, this will be sound in
438  terms of not preventing progress, since all the CPG joiners are
439  also API end-point carriers, and that's what matters locally
440  (who's the winner);
441  remotely, we will just compare leave_list and member_list and if
442  the left process has its node retained in member_list (under some
443  other PID, anyway) we will just ignore it as well
444  XXX: long-term fix is to establish in-out PID-aware tracking? */
445  if (peer) {
446  key = &left_list[i];
447  rival = bsearch(&key, sorted, member_list_entries,
448  sizeof(const struct cpg_address *),
449  cmp_member_list_nodeid);
450  }
451 
452  if (rival == NULL) {
453  crm_info("Group %s event %d: %s (node %u pid %u) left%s",
454  groupName->value, counter, peer_name(peer),
455  left_list[i].nodeid, left_list[i].pid,
456  cpgreason2str(left_list[i].reason));
457  if (peer) {
458  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg,
459  OFFLINESTATUS);
460  }
461  } else if (left_list[i].nodeid == local_nodeid) {
462  crm_warn("Group %s event %d: duplicate local pid %u left%s",
463  groupName->value, counter,
464  left_list[i].pid, cpgreason2str(left_list[i].reason));
465  } else {
466  crm_warn("Group %s event %d: "
467  "%s (node %u) duplicate pid %u left%s (%u remains)",
468  groupName->value, counter, peer_name(peer),
469  left_list[i].nodeid, left_list[i].pid,
470  cpgreason2str(left_list[i].reason), (*rival)->pid);
471  }
472  }
473  free(sorted);
474  sorted = NULL;
475 
476  for (i = 0; i < joined_list_entries; i++) {
477  crm_info("Group %s event %d: node %u pid %u joined%s",
478  groupName->value, counter, joined_list[i].nodeid,
479  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
480  }
481 
482  for (i = 0; i < member_list_entries; i++) {
483  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
484 
485  if (member_list[i].nodeid == local_nodeid
486  && member_list[i].pid != getpid()) {
487  /* see the note above */
488  crm_warn("Group %s event %d: detected duplicate local pid %u",
489  groupName->value, counter, member_list[i].pid);
490  continue;
491  }
492  crm_info("Group %s event %d: %s (node %u pid %u) is member",
493  groupName->value, counter, peer_name(peer),
494  member_list[i].nodeid, member_list[i].pid);
495 
496  /* If the caller left auto-reaping enabled, this will also update the
497  * state to member.
498  */
499  peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
500 
501  if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
502  /* The node is a CPG member, but we currently think it's not a
503  * cluster member. This is possible only if auto-reaping was
504  * disabled. The node may be joining, and we happened to get the CPG
505  * notification before the quorum notification; or the node may have
506  * just died, and we are processing its final messages; or a bug
507  * has affected the peer cache.
508  */
509  time_t now = time(NULL);
510 
511  if (peer->when_lost == 0) {
512  // Track when we first got into this contradictory state
513  peer->when_lost = now;
514 
515  } else if (now > (peer->when_lost + 60)) {
516  // If it persists for more than a minute, update the state
517  crm_warn("Node %u is member of group %s but was believed offline",
518  member_list[i].nodeid, groupName->value);
519  crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0);
520  }
521  }
522 
523  if (local_nodeid == member_list[i].nodeid) {
524  found = TRUE;
525  }
526  }
527 
528  if (!found) {
529  crm_err("Local node was evicted from group %s", groupName->value);
530  cpg_evicted = TRUE;
531  }
532 
533  counter++;
534 }
535 
536 gboolean
538 {
539  cs_error_t rc;
540  int fd = -1;
541  int retries = 0;
542  uint32_t id = 0;
543  crm_node_t *peer = NULL;
544  cpg_handle_t handle = 0;
545  const char *message_name = pcmk_message_name(crm_system_name);
546  uid_t found_uid = 0;
547  gid_t found_gid = 0;
548  pid_t found_pid = 0;
549  int rv;
550 
551  struct mainloop_fd_callbacks cpg_fd_callbacks = {
552  .dispatch = pcmk_cpg_dispatch,
553  .destroy = cluster->destroy,
554  };
555 
556  cpg_callbacks_t cpg_callbacks = {
557  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
558  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
559  /* .cpg_deliver_fn = pcmk_cpg_deliver, */
560  /* .cpg_confchg_fn = pcmk_cpg_membership, */
561  };
562 
563  cpg_evicted = FALSE;
564  cluster->group.length = 0;
565  cluster->group.value[0] = 0;
566 
567  /* group.value is char[128] */
568  strncpy(cluster->group.value, message_name, 127);
569  cluster->group.value[127] = 0;
570  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
571 
572  cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
573  if (rc != CS_OK) {
574  crm_err("Could not connect to the CPG API: %s (%d)",
575  cs_strerror(rc), rc);
576  goto bail;
577  }
578 
579  rc = cpg_fd_get(handle, &fd);
580  if (rc != CS_OK) {
581  crm_err("Could not obtain the CPG API connection: %s (%d)",
582  cs_strerror(rc), rc);
583  goto bail;
584  }
585 
586  /* CPG provider run as root (in given user namespace, anyway)? */
587  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
588  &found_uid, &found_gid))) {
589  crm_err("CPG provider is not authentic:"
590  " process %lld (uid: %lld, gid: %lld)",
591  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
592  (long long) found_uid, (long long) found_gid);
593  rc = CS_ERR_ACCESS;
594  goto bail;
595  } else if (rv < 0) {
596  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
597  strerror(-rv), -rv);
598  rc = CS_ERR_ACCESS;
599  goto bail;
600  }
601 
602  id = get_local_nodeid(handle);
603  if (id == 0) {
604  crm_err("Could not get local node id from the CPG API");
605  goto bail;
606 
607  }
608  cluster->nodeid = id;
609 
610  retries = 0;
611  cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
612  if (rc != CS_OK) {
613  crm_err("Could not join the CPG group '%s': %d", message_name, rc);
614  goto bail;
615  }
616 
617  pcmk_cpg_handle = handle;
618  cluster->cpg_handle = handle;
619  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
620 
621  bail:
622  if (rc != CS_OK) {
623  cpg_finalize(handle);
624  return FALSE;
625  }
626 
627  peer = crm_get_peer(id, NULL);
628  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
629  return TRUE;
630 }
631 
632 gboolean
633 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
634 {
635  gboolean rc = TRUE;
636  char *data = NULL;
637 
638  data = dump_xml_unformatted(msg);
639  rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
640  free(data);
641  return rc;
642 }
643 
644 gboolean
645 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
646  gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
647 {
648  static int msg_id = 0;
649  static int local_pid = 0;
650  static int local_name_len = 0;
651  static const char *local_name = NULL;
652 
653  char *target = NULL;
654  struct iovec *iov;
655  AIS_Message *msg = NULL;
657 
658  switch (msg_class) {
659  case crm_class_cluster:
660  break;
661  default:
662  crm_err("Invalid message class: %d", msg_class);
663  return FALSE;
664  }
665 
666  CRM_CHECK(dest != crm_msg_ais, return FALSE);
667 
668  if(local_name == NULL) {
669  local_name = get_local_node_name();
670  }
671  if(local_name_len == 0 && local_name) {
672  local_name_len = strlen(local_name);
673  }
674 
675  if (data == NULL) {
676  data = "";
677  }
678 
679  if (local_pid == 0) {
680  local_pid = getpid();
681  }
682 
683  if (sender == crm_msg_none) {
684  sender = local_pid;
685  }
686 
687  msg = calloc(1, sizeof(AIS_Message));
688 
689  msg_id++;
690  msg->id = msg_id;
691  msg->header.id = msg_class;
692  msg->header.error = CS_OK;
693 
694  msg->host.type = dest;
695  msg->host.local = local;
696 
697  if (node) {
698  if (node->uname) {
699  target = strdup(node->uname);
700  msg->host.size = strlen(node->uname);
701  memset(msg->host.uname, 0, MAX_NAME);
702  memcpy(msg->host.uname, node->uname, msg->host.size);
703  } else {
704  target = crm_strdup_printf("%u", node->id);
705  }
706  msg->host.id = node->id;
707  } else {
708  target = strdup("all");
709  }
710 
711  msg->sender.id = 0;
712  msg->sender.type = sender;
713  msg->sender.pid = local_pid;
714  msg->sender.size = local_name_len;
715  memset(msg->sender.uname, 0, MAX_NAME);
716  if(local_name && msg->sender.size) {
717  memcpy(msg->sender.uname, local_name, msg->sender.size);
718  }
719 
720  msg->size = 1 + strlen(data);
721  msg->header.size = sizeof(AIS_Message) + msg->size;
722 
723  if (msg->size < CRM_BZ2_THRESHOLD) {
724  msg = realloc_safe(msg, msg->header.size);
725  memcpy(msg->data, data, msg->size);
726 
727  } else {
728  char *compressed = NULL;
729  unsigned int new_size = 0;
730  char *uncompressed = strdup(data);
731 
732  if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
733  &compressed, &new_size) == pcmk_rc_ok) {
734 
735  msg->header.size = sizeof(AIS_Message) + new_size;
736  msg = realloc_safe(msg, msg->header.size);
737  memcpy(msg->data, compressed, new_size);
738 
739  msg->is_compressed = TRUE;
740  msg->compressed_size = new_size;
741 
742  } else {
743  // cppcheck seems not to understand the abort logic in realloc_safe
744  // cppcheck-suppress memleak
745  msg = realloc_safe(msg, msg->header.size);
746  memcpy(msg->data, data, msg->size);
747  }
748 
749  free(uncompressed);
750  free(compressed);
751  }
752 
753  iov = calloc(1, sizeof(struct iovec));
754  iov->iov_base = msg;
755  iov->iov_len = msg->header.size;
756 
757  if (msg->compressed_size) {
758  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
759  msg->id, target, (unsigned long long) iov->iov_len,
760  msg->compressed_size, data);
761  } else {
762  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
763  msg->id, target, (unsigned long long) iov->iov_len,
764  msg->size, data);
765  }
766  free(target);
767 
768  send_cpg_iov(iov);
769 
770  return TRUE;
771 }
772 
774 text2msg_type(const char *text)
775 {
776  int type = crm_msg_none;
777 
778  CRM_CHECK(text != NULL, return type);
779  text = pcmk_message_name(text);
780  if (safe_str_eq(text, "ais")) {
781  type = crm_msg_ais;
782  } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
783  type = crm_msg_cib;
784  } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)
785  || safe_str_eq(text, CRM_SYSTEM_DC)) {
786  type = crm_msg_crmd;
787  } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
788  type = crm_msg_te;
789  } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
790  type = crm_msg_pe;
791  } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
792  type = crm_msg_lrmd;
793  } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
794  type = crm_msg_stonithd;
795  } else if (safe_str_eq(text, "stonith-ng")) {
796  type = crm_msg_stonith_ng;
797  } else if (safe_str_eq(text, "attrd")) {
798  type = crm_msg_attrd;
799 
800  } else {
801  /* This will normally be a transient client rather than
802  * a cluster daemon. Set the type to the pid of the client
803  */
804  int scan_rc = sscanf(text, "%d", &type);
805 
806  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
807  /* Ensure it's sane */
808  type = crm_msg_none;
809  }
810  }
811  return type;
812 }
enum crm_ais_msg_types type
Definition: internal.h:22
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:233
char data[0]
Definition: internal.h:39
gboolean send_cpg_iov(struct iovec *iov)
Definition: cpg.c:219
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
Definition: cpg.c:37
#define crm_notice(fmt, args...)
Definition: logging.h:365
gboolean is_compressed
Definition: internal.h:31
const char * bz2_strerror(int rc)
Definition: results.c:718
uint32_t size
Definition: internal.h:36
gboolean safe_str_neq(const char *a, const char *b)
Definition: strings.c:263
crm_ais_msg_types
Definition: cluster.h:99
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:881
uint32_t nodeid
Definition: cluster.h:79
uint32_t id
Definition: cluster.h:65
const char * get_local_node_name(void)
Definition: cluster.c:120
void(* destroy)(gpointer)
Definition: cluster.h:81
uint32_t id
Definition: internal.h:19
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:34
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Definition: membership.c:653
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:115
char * crm_system_name
Definition: utils.c:52
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node&#39;s state and membership information.
Definition: membership.c:965
#define CS_SEND_MAX
Definition: cpg.c:146
uint32_t pid
Definition: internal.h:81
char * strerror(int errnum)
AIS_Host sender
Definition: internal.h:85
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:645
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:251
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:98
uint32_t id
Definition: internal.h:80
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:51
int cs_message_timer
Definition: cpg.c:134
#define crm_warn(fmt, args...)
Definition: logging.h:364
int rc
Definition: pcmk_fence.c:34
enum crm_ais_msg_types text2msg_type(const char *text)
Definition: cpg.c:774
#define crm_debug(fmt, args...)
Definition: logging.h:368
const char * pcmk_message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: utils.c:517
time_t when_lost
Definition: cluster.h:66
GListPtr cs_message_queue
Definition: cpg.c:133
#define crm_trace(fmt, args...)
Definition: logging.h:369
gboolean local
Definition: internal.h:21
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:785
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition: strings.c:541
#define CRM_SYSTEM_PENGINE
Definition: crm.h:104
AIS_Host sender
Definition: internal.h:34
uint32_t id
Definition: internal.h:30
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
Definition: corosync.c:421
struct crm_ais_msg_s AIS_Message
Definition: internal.h:16
cpg_handle_t pcmk_cpg_handle
Definition: cpg.c:34
#define ais_data_len(msg)
Definition: internal.h:125
uint32_t size
Definition: internal.h:23
#define CRM_NODE_MEMBER
Definition: cluster.h:34
uint32_t compressed_size
Definition: internal.h:37
#define MAX_NAME
Definition: crm.h:60
#define CRM_SYSTEM_CRMD
Definition: crm.h:102
const char * target
Definition: pcmk_fence.c:28
crm_ais_msg_class
Definition: cluster.h:95
#define CRM_XS
Definition: logging.h:54
#define CRM_SYSTEM_STONITHD
Definition: crm.h:106
#define CRM_SYSTEM_CIB
Definition: crm.h:101
#define CRM_SYSTEM_TENGINE
Definition: crm.h:105
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:65
#define crm_err(fmt, args...)
Definition: logging.h:363
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:152
#define CRM_ASSERT(expr)
Definition: results.h:42
char uname[MAX_NAME]
Definition: internal.h:24
#define OFFLINESTATUS
Definition: util.h:38
#define CRM_BZ2_THRESHOLD
Definition: xml.h:47
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3321
#define CRM_SYSTEM_LRMD
Definition: crm.h:103
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:633
char data[0]
Definition: internal.h:90
char * state
Definition: cluster.h:59
#define pcmk_ok
Definition: results.h:67
Wrappers for and extensions to libqb IPC.
uint32_t pid
Definition: internal.h:20
char * uname
Definition: cluster.h:57
#define cs_repeat(counter, max, code)
Definition: cpg.c:39
AIS_Host host
Definition: internal.h:33
#define safe_str_eq(a, b)
Definition: util.h:65
#define ONLINESTATUS
Definition: util.h:37
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
Definition: membership.c:522
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:404
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
GList * GListPtr
Definition: crm.h:214
#define crm_info(fmt, args...)
Definition: logging.h:366
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:537
enum crm_ais_msg_types type
Definition: internal.h:83
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process.
Definition: ipc.c:1672
gboolean local
Definition: internal.h:82