pacemaker  3.0.0-d8340737c4
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2024 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <crm/crm.h>
12 
13 #include <sys/param.h>
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #include <sys/socket.h>
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/ip.h>
22 #include <netinet/tcp.h>
23 #include <netdb.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <inttypes.h> // PRIx32
27 
28 #include <glib.h>
29 #include <bzlib.h>
30 
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
36 
37 #include <gnutls/gnutls.h>
38 
39 /* Swab macros from linux/swab.h */
40 #ifdef HAVE_LINUX_SWAB_H
41 # include <linux/swab.h>
42 #else
43 /*
44  * casts are necessary for constants, because we never know how for sure
45  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
46  */
47 #define __swab16(x) ((uint16_t)( \
48  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
49  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
50 
51 #define __swab32(x) ((uint32_t)( \
52  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
53  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
54  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
55  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
56 
57 #define __swab64(x) ((uint64_t)( \
58  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
59  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
60  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
61  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
62  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
63  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
64  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
65  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
66 #endif
67 
68 #define REMOTE_MSG_VERSION 1
69 #define ENDIAN_LOCAL 0xBADADBBD
70 
71 struct remote_header_v0 {
72  uint32_t endian; /* Detect messages from hosts with different endian-ness */
73  uint32_t version;
74  uint64_t id;
75  uint64_t flags;
76  uint32_t size_total;
77  uint32_t payload_offset;
78  uint32_t payload_compressed;
79  uint32_t payload_uncompressed;
80 
81  /* New fields get added here */
82 
83 } __attribute__ ((packed));
84 
96 static struct remote_header_v0 *
97 localized_remote_header(pcmk__remote_t *remote)
98 {
99  struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
100  if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
101  return NULL;
102 
103  } else if(header->endian != ENDIAN_LOCAL) {
104  uint32_t endian = __swab32(header->endian);
105 
107  if(endian != ENDIAN_LOCAL) {
108  crm_err("Invalid message detected, endian mismatch: %" PRIx32
109  " is neither %" PRIx32 " nor the swab'd %" PRIx32,
110  ENDIAN_LOCAL, header->endian, endian);
111  return NULL;
112  }
113 
114  header->id = __swab64(header->id);
115  header->flags = __swab64(header->flags);
116  header->endian = __swab32(header->endian);
117 
118  header->version = __swab32(header->version);
119  header->size_total = __swab32(header->size_total);
120  header->payload_offset = __swab32(header->payload_offset);
121  header->payload_compressed = __swab32(header->payload_compressed);
122  header->payload_uncompressed = __swab32(header->payload_uncompressed);
123  }
124 
125  return header;
126 }
127 
128 // \return Standard Pacemaker return code
129 static int
130 send_tls(gnutls_session_t session, struct iovec *iov)
131 {
132  const char *unsent = iov->iov_base;
133  size_t unsent_len = iov->iov_len;
134  ssize_t gnutls_rc;
135 
136  if (unsent == NULL) {
137  return EINVAL;
138  }
139 
140  crm_trace("Sending TLS message of %llu bytes",
141  (unsigned long long) unsent_len);
142  while (true) {
143  gnutls_rc = gnutls_record_send(session, unsent, unsent_len);
144 
145  if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
146  crm_trace("Retrying to send %llu bytes remaining",
147  (unsigned long long) unsent_len);
148 
149  } else if (gnutls_rc < 0) {
150  // Caller can log as error if necessary
151  crm_info("TLS connection terminated: %s " QB_XS " rc=%lld",
152  gnutls_strerror((int) gnutls_rc),
153  (long long) gnutls_rc);
154  return ECONNABORTED;
155 
156  } else if (gnutls_rc < unsent_len) {
157  crm_trace("Sent %lld of %llu bytes remaining",
158  (long long) gnutls_rc, (unsigned long long) unsent_len);
159  unsent_len -= gnutls_rc;
160  unsent += gnutls_rc;
161  } else {
162  crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
163  break;
164  }
165  }
166  return pcmk_rc_ok;
167 }
168 
169 // \return Standard Pacemaker return code
170 static int
171 send_plaintext(int sock, struct iovec *iov)
172 {
173  const char *unsent = iov->iov_base;
174  size_t unsent_len = iov->iov_len;
175  ssize_t write_rc;
176 
177  if (unsent == NULL) {
178  return EINVAL;
179  }
180 
181  crm_debug("Sending plaintext message of %llu bytes to socket %d",
182  (unsigned long long) unsent_len, sock);
183  while (true) {
184  write_rc = write(sock, unsent, unsent_len);
185  if (write_rc < 0) {
186  int rc = errno;
187 
188  if ((errno == EINTR) || (errno == EAGAIN)) {
189  crm_trace("Retrying to send %llu bytes remaining to socket %d",
190  (unsigned long long) unsent_len, sock);
191  continue;
192  }
193 
194  // Caller can log as error if necessary
195  crm_info("Could not send message: %s " QB_XS " rc=%d socket=%d",
196  pcmk_rc_str(rc), rc, sock);
197  return rc;
198 
199  } else if (write_rc < unsent_len) {
200  crm_trace("Sent %lld of %llu bytes remaining",
201  (long long) write_rc, (unsigned long long) unsent_len);
202  unsent += write_rc;
203  unsent_len -= write_rc;
204  continue;
205 
206  } else {
207  crm_trace("Sent all %lld bytes remaining: %.100s",
208  (long long) write_rc, (char *) (iov->iov_base));
209  break;
210  }
211  }
212  return pcmk_rc_ok;
213 }
215 // \return Standard Pacemaker return code
216 static int
217 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
218 {
219  int rc = pcmk_rc_ok;
220 
221  for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
222  if (remote->tls_session) {
223  rc = send_tls(remote->tls_session, &(iov[lpc]));
224  continue;
225  }
226  if (remote->tcp_socket) {
227  rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
228  } else {
229  rc = ESOCKTNOSUPPORT;
230  }
231  }
232  return rc;
233 }
234 
244 int
245 pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
246 {
247  int rc = pcmk_rc_ok;
248  static uint64_t id = 0;
249  GString *xml_text = NULL;
250 
251  struct iovec iov[2];
252  struct remote_header_v0 *header;
253 
254  CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
255 
256  xml_text = g_string_sized_new(1024);
257  pcmk__xml_string(msg, 0, xml_text, 0);
258  CRM_CHECK(xml_text->len > 0,
259  g_string_free(xml_text, TRUE); return EINVAL);
260 
261  header = pcmk__assert_alloc(1, sizeof(struct remote_header_v0));
262 
263  iov[0].iov_base = header;
264  iov[0].iov_len = sizeof(struct remote_header_v0);
265 
266  iov[1].iov_len = 1 + xml_text->len;
267  iov[1].iov_base = g_string_free(xml_text, FALSE);
268 
269  id++;
270  header->id = id;
271  header->endian = ENDIAN_LOCAL;
272  header->version = REMOTE_MSG_VERSION;
273  header->payload_offset = iov[0].iov_len;
274  header->payload_uncompressed = iov[1].iov_len;
275  header->size_total = iov[0].iov_len + iov[1].iov_len;
276 
277  rc = remote_send_iovs(remote, iov, 2);
278  if (rc != pcmk_rc_ok) {
279  crm_err("Could not send remote message: %s " QB_XS " rc=%d",
280  pcmk_rc_str(rc), rc);
281  }
282 
283  free(iov[0].iov_base);
284  g_free((gchar *) iov[1].iov_base);
285  return rc;
286 }
287 
297 xmlNode *
299 {
300  xmlNode *xml = NULL;
301  struct remote_header_v0 *header = localized_remote_header(remote);
302 
303  if (header == NULL) {
304  return NULL;
305  }
306 
307  /* Support compression on the receiving end now, in case we ever want to add it later */
308  if (header->payload_compressed) {
309  int rc = 0;
310  unsigned int size_u = 1 + header->payload_uncompressed;
311  char *uncompressed =
312  pcmk__assert_alloc(1, header->payload_offset + size_u);
313 
314  crm_trace("Decompressing message data %d bytes into %d bytes",
315  header->payload_compressed, size_u);
316 
317  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
318  remote->buffer + header->payload_offset,
319  header->payload_compressed, 1, 0);
320  rc = pcmk__bzlib2rc(rc);
321 
322  if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
323  crm_warn("Couldn't decompress v%d message, we only understand v%d",
324  header->version, REMOTE_MSG_VERSION);
325  free(uncompressed);
326  return NULL;
327 
328  } else if (rc != pcmk_rc_ok) {
329  crm_err("Decompression failed: %s " QB_XS " rc=%d",
330  pcmk_rc_str(rc), rc);
331  free(uncompressed);
332  return NULL;
333  }
334 
335  pcmk__assert(size_u == header->payload_uncompressed);
336 
337  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
338  remote->buffer_size = header->payload_offset + size_u;
339 
340  free(remote->buffer);
341  remote->buffer = uncompressed;
342  header = localized_remote_header(remote);
343  }
344 
345  /* take ownership of the buffer */
346  remote->buffer_offset = 0;
347 
348  CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
349 
350  xml = pcmk__xml_parse(remote->buffer + header->payload_offset);
351  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
352  crm_warn("Couldn't parse v%d message, we only understand v%d",
353  header->version, REMOTE_MSG_VERSION);
354 
355  } else if (xml == NULL) {
356  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
357  }
358 
359  crm_log_xml_trace(xml, "[remote msg]");
360  return xml;
361 }
362 
363 static int
364 get_remote_socket(const pcmk__remote_t *remote)
365 {
366  if (remote->tls_session) {
367  void *sock_ptr = gnutls_transport_get_ptr(remote->tls_session);
368 
369  return GPOINTER_TO_INT(sock_ptr);
370  }
371 
372  if (remote->tcp_socket) {
373  return remote->tcp_socket;
374  }
375 
376  crm_err("Remote connection type undetermined (bug?)");
377  return -1;
378 }
379 
391 int
392 pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
393 {
394  struct pollfd fds = { 0, };
395  int sock = 0;
396  int rc = 0;
397  time_t start;
398  int timeout = timeout_ms;
399 
400  sock = get_remote_socket(remote);
401  if (sock <= 0) {
402  crm_trace("No longer connected");
403  return ENOTCONN;
404  }
405 
406  start = time(NULL);
407  errno = 0;
408  do {
409  fds.fd = sock;
410  fds.events = POLLIN;
411 
412  /* If we got an EINTR while polling, and we have a
413  * specific timeout we are trying to honor, attempt
414  * to adjust the timeout to the closest second. */
415  if (errno == EINTR && (timeout > 0)) {
416  timeout = timeout_ms - ((time(NULL) - start) * 1000);
417  if (timeout < 1000) {
418  timeout = 1000;
419  }
420  }
421 
422  rc = poll(&fds, 1, timeout);
423  } while (rc < 0 && errno == EINTR);
424 
425  if (rc < 0) {
426  return errno;
427  }
428  return (rc == 0)? ETIME : pcmk_rc_ok;
429 }
430 
443 int
445 {
446  int rc = pcmk_rc_ok;
447  size_t read_len = sizeof(struct remote_header_v0);
448  struct remote_header_v0 *header = localized_remote_header(remote);
449  ssize_t read_rc;
450 
451  if(header) {
452  /* Stop at the end of the current message */
453  read_len = header->size_total;
454  }
455 
456  /* automatically grow the buffer when needed */
457  if(remote->buffer_size < read_len) {
458  remote->buffer_size = 2 * read_len;
459  crm_trace("Expanding buffer to %llu bytes",
460  (unsigned long long) remote->buffer_size);
461  remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
462  }
463 
464  if (remote->tls_session) {
465  read_rc = gnutls_record_recv(remote->tls_session,
466  remote->buffer + remote->buffer_offset,
467  remote->buffer_size - remote->buffer_offset);
468  if (read_rc == GNUTLS_E_INTERRUPTED) {
469  rc = EINTR;
470  } else if (read_rc == GNUTLS_E_AGAIN) {
471  rc = EAGAIN;
472  } else if (read_rc < 0) {
473  crm_debug("TLS receive failed: %s (%lld)",
474  gnutls_strerror(read_rc), (long long) read_rc);
475  rc = EIO;
476  }
477  } else if (remote->tcp_socket) {
478  read_rc = read(remote->tcp_socket,
479  remote->buffer + remote->buffer_offset,
480  remote->buffer_size - remote->buffer_offset);
481  if (read_rc < 0) {
482  rc = errno;
483  }
484  } else {
485  crm_err("Remote connection type undetermined (bug?)");
486  return ESOCKTNOSUPPORT;
487  }
488 
489  /* process any errors. */
490  if (read_rc > 0) {
491  remote->buffer_offset += read_rc;
492  /* always null terminate buffer, the +1 to alloc always allows for this. */
493  remote->buffer[remote->buffer_offset] = '\0';
494  crm_trace("Received %lld more bytes (%llu total)",
495  (long long) read_rc,
496  (unsigned long long) remote->buffer_offset);
497 
498  } else if ((rc == EINTR) || (rc == EAGAIN)) {
499  crm_trace("No data available for non-blocking remote read: %s (%d)",
500  pcmk_rc_str(rc), rc);
501 
502  } else if (read_rc == 0) {
503  crm_debug("End of remote data encountered after %llu bytes",
504  (unsigned long long) remote->buffer_offset);
505  return ENOTCONN;
506 
507  } else {
508  crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
509  (unsigned long long) remote->buffer_offset,
510  pcmk_rc_str(rc), rc);
511  return ENOTCONN;
512  }
513 
514  header = localized_remote_header(remote);
515  if(header) {
516  if(remote->buffer_offset < header->size_total) {
517  crm_trace("Read partial remote message (%llu of %u bytes)",
518  (unsigned long long) remote->buffer_offset,
519  header->size_total);
520  } else {
521  crm_trace("Read full remote message of %llu bytes",
522  (unsigned long long) remote->buffer_offset);
523  return pcmk_rc_ok;
524  }
525  }
526 
527  return EAGAIN;
528 }
529 
540 int
542 {
543  int rc = pcmk_rc_ok;
544  time_t start = time(NULL);
545  int remaining_timeout = 0;
546 
547  if (timeout_ms == 0) {
548  timeout_ms = 10000;
549  } else if (timeout_ms < 0) {
550  timeout_ms = 60000;
551  }
552 
553  remaining_timeout = timeout_ms;
554  while (remaining_timeout > 0) {
555 
556  crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
557  remaining_timeout, timeout_ms);
558  rc = pcmk__remote_ready(remote, remaining_timeout);
559 
560  if (rc == ETIME) {
561  crm_err("Timed out (%d ms) while waiting for remote data",
562  remaining_timeout);
563  return rc;
564 
565  } else if (rc != pcmk_rc_ok) {
566  crm_debug("Wait for remote data aborted (will retry): %s "
567  QB_XS " rc=%d", pcmk_rc_str(rc), rc);
568 
569  } else {
571  if (rc == pcmk_rc_ok) {
572  return rc;
573  } else if (rc == EAGAIN) {
574  crm_trace("Waiting for more remote data");
575  } else {
576  crm_debug("Could not receive remote data: %s " QB_XS " rc=%d",
577  pcmk_rc_str(rc), rc);
578  }
579  }
580 
581  // Don't waste time retrying after fatal errors
582  if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
583  return rc;
584  }
585 
586  remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
587  }
588  return ETIME;
589 }
590 
591 struct tcp_async_cb_data {
592  int sock;
593  int timeout_ms;
594  time_t start;
595  void *userdata;
596  void (*callback) (void *userdata, int rc, int sock);
597 };
598 
599 // \return TRUE if timer should be rescheduled, FALSE otherwise
600 static gboolean
601 check_connect_finished(gpointer userdata)
602 {
603  struct tcp_async_cb_data *cb_data = userdata;
604  int rc;
605 
606  fd_set rset, wset;
607  struct timeval ts = { 0, };
608 
609  if (cb_data->start == 0) {
610  // Last connect() returned success immediately
611  rc = pcmk_rc_ok;
612  goto dispatch_done;
613  }
614 
615  // If the socket is ready for reading or writing, the connect succeeded
616  FD_ZERO(&rset);
617  FD_SET(cb_data->sock, &rset);
618  wset = rset;
619  rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
620 
621  if (rc < 0) { // select() error
622  rc = errno;
623  if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
624  if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
625  return TRUE; // There is time left, so reschedule timer
626  } else {
627  rc = ETIMEDOUT;
628  }
629  }
630  crm_trace("Could not check socket %d for connection success: %s (%d)",
631  cb_data->sock, pcmk_rc_str(rc), rc);
632 
633  } else if (rc == 0) { // select() timeout
634  if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
635  return TRUE; // There is time left, so reschedule timer
636  }
637  crm_debug("Timed out while waiting for socket %d connection success",
638  cb_data->sock);
639  rc = ETIMEDOUT;
640 
641  // select() returned number of file descriptors that are ready
642 
643  } else if (FD_ISSET(cb_data->sock, &rset)
644  || FD_ISSET(cb_data->sock, &wset)) {
645 
646  // The socket is ready; check it for connection errors
647  int error = 0;
648  socklen_t len = sizeof(error);
649 
650  if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
651  rc = errno;
652  crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
653  cb_data->sock, pcmk_rc_str(rc), rc);
654  } else if (error != 0) {
655  rc = error;
656  crm_trace("Socket %d connected with error: %s (%d)",
657  cb_data->sock, pcmk_rc_str(rc), rc);
658  } else {
659  rc = pcmk_rc_ok;
660  }
661 
662  } else { // Should not be possible
663  crm_trace("select() succeeded, but socket %d not in resulting "
664  "read/write sets", cb_data->sock);
665  rc = EAGAIN;
666  }
667 
668  dispatch_done:
669  if (rc == pcmk_rc_ok) {
670  crm_trace("Socket %d is connected", cb_data->sock);
671  } else {
672  close(cb_data->sock);
673  cb_data->sock = -1;
674  }
675 
676  if (cb_data->callback) {
677  cb_data->callback(cb_data->userdata, rc, cb_data->sock);
678  }
679  free(cb_data);
680  return FALSE; // Do not reschedule timer
681 }
682 
701 static int
702 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
703  int timeout_ms, int *timer_id, void *userdata,
704  void (*callback) (void *userdata, int rc, int sock))
705 {
706  int rc = 0;
707  int interval = 500;
708  int timer;
709  struct tcp_async_cb_data *cb_data = NULL;
710 
711  rc = pcmk__set_nonblocking(sock);
712  if (rc != pcmk_rc_ok) {
713  crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
714  pcmk_rc_str(rc), rc);
715  return rc;
716  }
717 
718  rc = connect(sock, addr, addrlen);
719  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
720  rc = errno;
721  crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
722  pcmk_rc_str(rc), rc);
723  return rc;
724  }
725 
726  cb_data = pcmk__assert_alloc(1, sizeof(struct tcp_async_cb_data));
727  cb_data->userdata = userdata;
728  cb_data->callback = callback;
729  cb_data->sock = sock;
730  cb_data->timeout_ms = timeout_ms;
731 
732  if (rc == 0) {
733  /* The connect was successful immediately, we still return to mainloop
734  * and let this callback get called later. This avoids the user of this api
735  * to have to account for the fact the callback could be invoked within this
736  * function before returning. */
737  cb_data->start = 0;
738  interval = 1;
739  } else {
740  cb_data->start = time(NULL);
741  }
742 
743  /* This timer function does a non-blocking poll on the socket to see if we
744  * can use it. Once we can, the connect has completed. This method allows us
745  * to connect without blocking the mainloop.
746  *
747  * @TODO Use a mainloop fd callback for this instead of polling. Something
748  * about the way mainloop is currently polling prevents this from
749  * working at the moment though. (See connect(2) regarding EINPROGRESS
750  * for possible new handling needed.)
751  */
752  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
753  interval, sock);
754  timer = pcmk__create_timer(interval, check_connect_finished, cb_data);
755  if (timer_id) {
756  *timer_id = timer;
757  }
758 
759  // timer callback should be taking care of cb_data
760  // cppcheck-suppress memleak
761  return pcmk_rc_ok;
762 }
763 
774 static int
775 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
776 {
777  int rc = connect(sock, addr, addrlen);
778 
779  if (rc < 0) {
780  rc = errno;
781  crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
782  pcmk_rc_str(rc), rc);
783  return rc;
784  }
785 
786  rc = pcmk__set_nonblocking(sock);
787  if (rc != pcmk_rc_ok) {
788  crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
789  pcmk_rc_str(rc), rc);
790  return rc;
791  }
792 
793  return pcmk_ok;
794 }
795 
812 int
813 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
814  int *sock_fd, void *userdata,
815  void (*callback) (void *userdata, int rc, int sock))
816 {
817  char buffer[INET6_ADDRSTRLEN];
818  struct addrinfo *res = NULL;
819  struct addrinfo *rp = NULL;
820  struct addrinfo hints;
821  const char *server = host;
822  int rc;
823  int sock = -1;
824 
825  CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
826 
827  // Get host's IP address(es)
828  memset(&hints, 0, sizeof(struct addrinfo));
829  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
830  hints.ai_socktype = SOCK_STREAM;
831  hints.ai_flags = AI_CANONNAME;
832 
833  rc = getaddrinfo(server, NULL, &hints, &res);
834  rc = pcmk__gaierror2rc(rc);
835 
836  if (rc != pcmk_rc_ok) {
837  crm_err("Unable to get IP address info for %s: %s",
838  server, pcmk_rc_str(rc));
839  goto async_cleanup;
840  }
841 
842  if (!res || !res->ai_addr) {
843  crm_err("Unable to get IP address info for %s: no result", server);
844  rc = ENOTCONN;
845  goto async_cleanup;
846  }
847 
848  // getaddrinfo() returns a list of host's addresses, try them in order
849  for (rp = res; rp != NULL; rp = rp->ai_next) {
850  struct sockaddr *addr = rp->ai_addr;
851 
852  if (!addr) {
853  continue;
854  }
855 
856  if (rp->ai_canonname) {
857  server = res->ai_canonname;
858  }
859  crm_debug("Got canonical name %s for %s", server, host);
860 
861  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
862  if (sock == -1) {
863  rc = errno;
864  crm_warn("Could not create socket for remote connection to %s:%d: "
865  "%s " QB_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
866  continue;
867  }
868 
869  /* Set port appropriately for address family */
870  /* (void*) casts avoid false-positive compiler alignment warnings */
871  if (addr->sa_family == AF_INET6) {
872  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
873  } else {
874  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
875  }
876 
877  memset(buffer, 0, PCMK__NELEM(buffer));
878  pcmk__sockaddr2str(addr, buffer);
879  crm_info("Attempting remote connection to %s:%d", buffer, port);
880 
881  if (callback) {
882  if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
883  timer_id, userdata, callback) == pcmk_rc_ok) {
884  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
885  }
886 
887  } else if (connect_socket_once(sock, rp->ai_addr,
888  rp->ai_addrlen) == pcmk_rc_ok) {
889  break; /* Success */
890  }
891 
892  // Connect failed
893  close(sock);
894  sock = -1;
895  rc = ENOTCONN;
896  }
897 
898 async_cleanup:
899 
900  if (res) {
901  freeaddrinfo(res);
902  }
903  *sock_fd = sock;
904  return rc;
905 }
906 
918 void
919 pcmk__sockaddr2str(const void *sa, char *s)
920 {
921  switch (((const struct sockaddr *) sa)->sa_family) {
922  case AF_INET:
923  inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
924  s, INET6_ADDRSTRLEN);
925  break;
926 
927  case AF_INET6:
928  inet_ntop(AF_INET6,
929  &(((const struct sockaddr_in6 *) sa)->sin6_addr),
930  s, INET6_ADDRSTRLEN);
931  break;
932 
933  default:
934  strcpy(s, "<invalid>");
935  }
936 }
937 
947 int
948 pcmk__accept_remote_connection(int ssock, int *csock)
949 {
950  int rc;
951  struct sockaddr_storage addr;
952  socklen_t laddr = sizeof(addr);
953  char addr_str[INET6_ADDRSTRLEN];
954 #ifdef TCP_USER_TIMEOUT
955  long sbd_timeout = 0;
956 #endif
957 
958  /* accept the connection */
959  memset(&addr, 0, sizeof(addr));
960  *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
961  if (*csock == -1) {
962  rc = errno;
963  crm_err("Could not accept remote client connection: %s "
964  QB_XS " rc=%d", pcmk_rc_str(rc), rc);
965  return rc;
966  }
967  pcmk__sockaddr2str(&addr, addr_str);
968  crm_info("Accepted new remote client connection from %s", addr_str);
969 
970  rc = pcmk__set_nonblocking(*csock);
971  if (rc != pcmk_rc_ok) {
972  crm_err("Could not set socket non-blocking: %s " QB_XS " rc=%d",
973  pcmk_rc_str(rc), rc);
974  close(*csock);
975  *csock = -1;
976  return rc;
977  }
978 
979 #ifdef TCP_USER_TIMEOUT
980  sbd_timeout = pcmk__get_sbd_watchdog_timeout();
981  if (sbd_timeout > 0) {
982  // Time to fail and retry before watchdog
983  long half = sbd_timeout / 2;
984  unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
985 
986  rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
987  &optval, sizeof(optval));
988  if (rc < 0) {
989  rc = errno;
990  crm_err("Could not set TCP timeout to %d ms on remote connection: "
991  "%s " QB_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
992  close(*csock);
993  *csock = -1;
994  return rc;
995  }
996  }
997 #endif
998 
999  return rc;
1000 }
1001 
1007 int
1009 {
1010  static int port = 0;
1011 
1012  if (port == 0) {
1013  const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
1014 
1015  if (env) {
1016  errno = 0;
1017  port = strtol(env, NULL, 10);
1018  if (errno || (port < 1) || (port > 65535)) {
1019  crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
1020  " has invalid value '%s', using %d instead",
1021  env, DEFAULT_REMOTE_PORT);
1022  port = DEFAULT_REMOTE_PORT;
1023  }
1024  } else {
1025  port = DEFAULT_REMOTE_PORT;
1026  }
1027  }
1028  return port;
1029 }
pcmk__cpg_host_t host
Definition: cpg.c:52
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:213
A dumping ground.
int pcmk__set_nonblocking(int fd)
Definition: io.c:520
#define ETIME
Definition: portability.h:66
uint32_t payload_compressed
Definition: remote.c:214
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition: xml_io.c:411
size_t buffer_offset
Definition: ipc_internal.h:106
#define PCMK__ENV_REMOTE_PORT
uint32_t payload_uncompressed
Definition: remote.c:215
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:196
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
Definition: remote.c:1008
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition: results.c:609
#define ENDIAN_LOCAL
Definition: remote.c:69
const char * pcmk__env_option(const char *option)
Definition: options.c:1075
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:208
void pcmk__sockaddr2str(const void *sa, char *s)
Definition: remote.c:919
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:56
#define crm_warn(fmt, args...)
Definition: logging.h:362
#define crm_debug(fmt, args...)
Definition: logging.h:370
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:392
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
Definition: utils.c:401
#define crm_trace(fmt, args...)
Definition: logging.h:372
uint64_t id
Definition: remote.c:210
#define __swab64(x)
Definition: remote.c:57
size_t buffer_size
Definition: ipc_internal.h:105
Wrappers for and extensions to libxml2.
#define PCMK__NELEM(a)
Definition: internal.h:49
int pcmk__read_available_remote_data(pcmk__remote_t *remote)
Definition: remote.c:444
long pcmk__get_sbd_watchdog_timeout(void)
Definition: watchdog.c:231
uint32_t payload_offset
Definition: remote.c:213
xmlNode * pcmk__xml_parse(const char *input)
Definition: xml_io.c:168
struct tcp_async_cb_data __attribute__
#define pcmk__assert(expr)
uint32_t size_total
Definition: remote.c:212
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition: results.c:1014
#define __swab32(x)
Definition: remote.c:51
#define REMOTE_MSG_VERSION
Definition: remote.c:68
int pcmk__gaierror2rc(int gai)
Map a getaddrinfo() return code to the most similar Pacemaker return code.
Definition: results.c:973
#define crm_err(fmt, args...)
Definition: logging.h:359
guint pcmk__timeout_ms2s(guint timeout_ms)
Definition: utils.c:425
int pcmk__accept_remote_connection(int ssock, int *csock)
Definition: remote.c:948
gnutls_session_t tls_session
Definition: ipc_internal.h:117
#define pcmk_ok
Definition: results.h:65
#define crm_log_xml_trace(xml, text)
Definition: logging.h:380
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
Definition: remote.c:813
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:541
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
Definition: remote.c:298
#define pcmk__assert_alloc(nmemb, size)
Definition: internal.h:257
unsigned int timeout
Definition: pcmk_fence.c:34
#define crm_info(fmt, args...)
Definition: logging.h:367
int pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
Definition: remote.c:245
uint32_t version
Definition: remote.c:209
uint64_t flags
Definition: remote.c:211