pacemaker  2.1.9-49aab99839
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2024 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <crm/crm.h>
12 
13 #include <sys/param.h>
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #include <sys/socket.h>
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/ip.h>
22 #include <netinet/tcp.h>
23 #include <netdb.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <inttypes.h> // PRIx32
27 
28 #include <glib.h>
29 #include <bzlib.h>
30 
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
35 
36 #ifdef HAVE_GNUTLS_GNUTLS_H
37 # include <gnutls/gnutls.h>
38 #endif
39 
40 /* Swab macros from linux/swab.h */
41 #ifdef HAVE_LINUX_SWAB_H
42 # include <linux/swab.h>
43 #else
44 /*
45  * casts are necessary for constants, because we never know how for sure
46  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
47  */
48 #define __swab16(x) ((uint16_t)( \
49  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
50  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51 
52 #define __swab32(x) ((uint32_t)( \
53  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
54  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
55  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
56  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57 
58 #define __swab64(x) ((uint64_t)( \
59  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
60  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
61  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
62  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
63  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
64  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
65  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
66  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
67 #endif
68 
69 #define REMOTE_MSG_VERSION 1
70 #define ENDIAN_LOCAL 0xBADADBBD
71 
72 struct remote_header_v0 {
73  uint32_t endian; /* Detect messages from hosts with different endian-ness */
74  uint32_t version;
75  uint64_t id;
76  uint64_t flags;
77  uint32_t size_total;
78  uint32_t payload_offset;
79  uint32_t payload_compressed;
80  uint32_t payload_uncompressed;
81 
82  /* New fields get added here */
83 
84 } __attribute__ ((packed));
85 
97 static struct remote_header_v0 *
98 localized_remote_header(pcmk__remote_t *remote)
99 {
100  struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
101  if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
102  return NULL;
103 
104  } else if(header->endian != ENDIAN_LOCAL) {
105  uint32_t endian = __swab32(header->endian);
106 
108  if(endian != ENDIAN_LOCAL) {
109  crm_err("Invalid message detected, endian mismatch: %" PRIx32
110  " is neither %" PRIx32 " nor the swab'd %" PRIx32,
111  ENDIAN_LOCAL, header->endian, endian);
112  return NULL;
113  }
114 
115  header->id = __swab64(header->id);
116  header->flags = __swab64(header->flags);
117  header->endian = __swab32(header->endian);
118 
119  header->version = __swab32(header->version);
120  header->size_total = __swab32(header->size_total);
121  header->payload_offset = __swab32(header->payload_offset);
122  header->payload_compressed = __swab32(header->payload_compressed);
123  header->payload_uncompressed = __swab32(header->payload_uncompressed);
124  }
125 
126  return header;
127 }
128 
129 #ifdef HAVE_GNUTLS_GNUTLS_H
130 
131 int
132 pcmk__tls_client_try_handshake(pcmk__remote_t *remote, int *gnutls_rc)
133 {
134  int rc = pcmk_rc_ok;
135 
136  if (gnutls_rc != NULL) {
137  *gnutls_rc = GNUTLS_E_SUCCESS;
138  }
139 
140  rc = gnutls_handshake(*remote->tls_session);
141 
142  switch (rc) {
143  case GNUTLS_E_SUCCESS:
144  rc = pcmk_rc_ok;
145  break;
146 
147  case GNUTLS_E_INTERRUPTED:
148  case GNUTLS_E_AGAIN:
149  rc = EAGAIN;
150  break;
151 
152  default:
153  if (gnutls_rc != NULL) {
154  *gnutls_rc = rc;
155  }
156 
157  rc = EPROTO;
158  break;
159  }
160 
161  return rc;
162 }
163 
164 int pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_sec,
165  int *gnutls_rc)
166 {
167  const time_t time_limit = time(NULL) + timeout_sec;
168 
169  do {
170  int rc = pcmk__tls_client_try_handshake(remote, gnutls_rc);
171 
172  if (rc != EAGAIN) {
173  return rc;
174  }
175  } while (time(NULL) < time_limit);
176 
177  return ETIME;
178 }
179 
186 static void
187 set_minimum_dh_bits(const gnutls_session_t *session)
188 {
189  int dh_min_bits;
190 
192  0);
193 
194  /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
195  * the priority string imply the DH requirements, but this is the only
196  * way to give the user control over compatibility with older servers.
197  */
198  if (dh_min_bits > 0) {
199  crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
200  dh_min_bits);
201  crm_warn("Support for the " PCMK__ENV_DH_MIN_BITS " "
202  "environment variable is deprecated and will be removed "
203  "in a future release");
204  gnutls_dh_set_prime_bits(*session, dh_min_bits);
205  }
206 }
207 
208 static unsigned int
209 get_bound_dh_bits(unsigned int dh_bits)
210 {
211  int dh_min_bits;
212  int dh_max_bits;
215  0);
217  0);
219  if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) {
220  crm_warn("Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits");
221  dh_max_bits = 0;
222  }
223  if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
224  return dh_min_bits;
225  }
226  if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
227  return dh_max_bits;
228  }
229  return dh_bits;
230 }
231 
243 gnutls_session_t *
244 pcmk__new_tls_session(int csock, unsigned int conn_type,
245  gnutls_credentials_type_t cred_type, void *credentials)
246 {
247  int rc = GNUTLS_E_SUCCESS;
248  const char *prio_base = NULL;
249  char *prio = NULL;
250  gnutls_session_t *session = NULL;
251 
252  /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
253  * values required for its functionality.
254  *
255  * For an example of anonymous authentication, see:
256  * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
257  */
258 
260  if (prio_base == NULL) {
261  prio_base = PCMK_GNUTLS_PRIORITIES;
262  }
263  prio = crm_strdup_printf("%s:%s", prio_base,
264  (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
265 
266  session = gnutls_malloc(sizeof(gnutls_session_t));
267  if (session == NULL) {
268  rc = GNUTLS_E_MEMORY_ERROR;
269  goto error;
270  }
271 
272  rc = gnutls_init(session, conn_type);
273  if (rc != GNUTLS_E_SUCCESS) {
274  goto error;
275  }
276 
277  /* @TODO On the server side, it would be more efficient to cache the
278  * priority with gnutls_priority_init2() and set it with
279  * gnutls_priority_set() for all sessions.
280  */
281  rc = gnutls_priority_set_direct(*session, prio, NULL);
282  if (rc != GNUTLS_E_SUCCESS) {
283  goto error;
284  }
285  if (conn_type == GNUTLS_CLIENT) {
286  set_minimum_dh_bits(session);
287  }
288 
289  gnutls_transport_set_ptr(*session,
290  (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
291 
292  rc = gnutls_credentials_set(*session, cred_type, credentials);
293  if (rc != GNUTLS_E_SUCCESS) {
294  goto error;
295  }
296  free(prio);
297  return session;
298 
299 error:
300  crm_err("Could not initialize %s TLS %s session: %s "
301  CRM_XS " rc=%d priority='%s'",
302  (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
303  (conn_type == GNUTLS_SERVER)? "server" : "client",
304  gnutls_strerror(rc), rc, prio);
305  free(prio);
306  if (session != NULL) {
307  gnutls_free(session);
308  }
309  return NULL;
310 }
311 
327 int
328 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
329 {
330  int rc = GNUTLS_E_SUCCESS;
331  unsigned int dh_bits = 0;
332 
333  rc = gnutls_dh_params_init(dh_params);
334  if (rc != GNUTLS_E_SUCCESS) {
335  goto error;
336  }
337 
338  dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
339  GNUTLS_SEC_PARAM_NORMAL);
340  if (dh_bits == 0) {
341  rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
342  goto error;
343  }
344  dh_bits = get_bound_dh_bits(dh_bits);
345 
346  crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
347  dh_bits);
348  rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
349  if (rc != GNUTLS_E_SUCCESS) {
350  goto error;
351  }
352 
353  return pcmk_rc_ok;
354 
355 error:
356  crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
357  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
358  return EPROTO;
359 }
360 
372 int
373 pcmk__read_handshake_data(const pcmk__client_t *client)
374 {
375  int rc = 0;
376 
377  pcmk__assert((client != NULL) && (client->remote != NULL)
378  && (client->remote->tls_session != NULL));
379 
380  do {
381  rc = gnutls_handshake(*client->remote->tls_session);
382  } while (rc == GNUTLS_E_INTERRUPTED);
383 
384  if (rc == GNUTLS_E_AGAIN) {
385  /* No more data is available at the moment. This function should be
386  * invoked again once the client sends more.
387  */
388  return EAGAIN;
389  } else if (rc != GNUTLS_E_SUCCESS) {
390  crm_err("TLS handshake with remote client failed: %s "
391  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
392  return EPROTO;
393  }
394  return pcmk_rc_ok;
395 }
396 
397 // \return Standard Pacemaker return code
398 static int
399 send_tls(gnutls_session_t *session, struct iovec *iov)
400 {
401  const char *unsent = iov->iov_base;
402  size_t unsent_len = iov->iov_len;
403  ssize_t gnutls_rc;
404 
405  if (unsent == NULL) {
406  return EINVAL;
407  }
408 
409  crm_trace("Sending TLS message of %llu bytes",
410  (unsigned long long) unsent_len);
411  while (true) {
412  gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
413 
414  if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
415  crm_trace("Retrying to send %llu bytes remaining",
416  (unsigned long long) unsent_len);
417 
418  } else if (gnutls_rc < 0) {
419  // Caller can log as error if necessary
420  crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
421  gnutls_strerror((int) gnutls_rc),
422  (long long) gnutls_rc);
423  return ECONNABORTED;
424 
425  } else if (gnutls_rc < unsent_len) {
426  crm_trace("Sent %lld of %llu bytes remaining",
427  (long long) gnutls_rc, (unsigned long long) unsent_len);
428  unsent_len -= gnutls_rc;
429  unsent += gnutls_rc;
430  } else {
431  crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
432  break;
433  }
434  }
435  return pcmk_rc_ok;
436 }
437 #endif
438 
439 // \return Standard Pacemaker return code
440 static int
441 send_plaintext(int sock, struct iovec *iov)
442 {
443  const char *unsent = iov->iov_base;
444  size_t unsent_len = iov->iov_len;
445  ssize_t write_rc;
446 
447  if (unsent == NULL) {
448  return EINVAL;
449  }
450 
451  crm_debug("Sending plaintext message of %llu bytes to socket %d",
452  (unsigned long long) unsent_len, sock);
453  while (true) {
454  write_rc = write(sock, unsent, unsent_len);
455  if (write_rc < 0) {
456  int rc = errno;
457 
458  if ((errno == EINTR) || (errno == EAGAIN)) {
459  crm_trace("Retrying to send %llu bytes remaining to socket %d",
460  (unsigned long long) unsent_len, sock);
461  continue;
462  }
463 
464  // Caller can log as error if necessary
465  crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
466  pcmk_rc_str(rc), rc, sock);
467  return rc;
468 
469  } else if (write_rc < unsent_len) {
470  crm_trace("Sent %lld of %llu bytes remaining",
471  (long long) write_rc, (unsigned long long) unsent_len);
472  unsent += write_rc;
473  unsent_len -= write_rc;
474  continue;
475 
476  } else {
477  crm_trace("Sent all %lld bytes remaining: %.100s",
478  (long long) write_rc, (char *) (iov->iov_base));
479  break;
480  }
481  }
482  return pcmk_rc_ok;
483 }
484 
485 // \return Standard Pacemaker return code
486 static int
487 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
488 {
489  int rc = pcmk_rc_ok;
490 
491  for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
492 #ifdef HAVE_GNUTLS_GNUTLS_H
493  if (remote->tls_session) {
494  rc = send_tls(remote->tls_session, &(iov[lpc]));
495  continue;
496  }
497 #endif
498  if (remote->tcp_socket) {
499  rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
500  } else {
501  rc = ESOCKTNOSUPPORT;
502  }
503  }
504  return rc;
505 }
506 
516 int
517 pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
518 {
519  int rc = pcmk_rc_ok;
520  static uint64_t id = 0;
521  GString *xml_text = NULL;
522 
523  struct iovec iov[2];
524  struct remote_header_v0 *header;
525 
526  CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
527 
528  xml_text = g_string_sized_new(1024);
529  pcmk__xml_string(msg, 0, xml_text, 0);
530  CRM_CHECK(xml_text->len > 0,
531  g_string_free(xml_text, TRUE); return EINVAL);
532 
533  header = pcmk__assert_alloc(1, sizeof(struct remote_header_v0));
534 
535  iov[0].iov_base = header;
536  iov[0].iov_len = sizeof(struct remote_header_v0);
537 
538  iov[1].iov_len = 1 + xml_text->len;
539  iov[1].iov_base = g_string_free(xml_text, FALSE);
540 
541  id++;
542  header->id = id;
543  header->endian = ENDIAN_LOCAL;
544  header->version = REMOTE_MSG_VERSION;
545  header->payload_offset = iov[0].iov_len;
546  header->payload_uncompressed = iov[1].iov_len;
547  header->size_total = iov[0].iov_len + iov[1].iov_len;
548 
549  rc = remote_send_iovs(remote, iov, 2);
550  if (rc != pcmk_rc_ok) {
551  crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
552  pcmk_rc_str(rc), rc);
553  }
554 
555  free(iov[0].iov_base);
556  g_free((gchar *) iov[1].iov_base);
557  return rc;
558 }
559 
569 xmlNode *
571 {
572  xmlNode *xml = NULL;
573  struct remote_header_v0 *header = localized_remote_header(remote);
574 
575  if (header == NULL) {
576  return NULL;
577  }
578 
579  /* Support compression on the receiving end now, in case we ever want to add it later */
580  if (header->payload_compressed) {
581  int rc = 0;
582  unsigned int size_u = 1 + header->payload_uncompressed;
583  char *uncompressed =
584  pcmk__assert_alloc(1, header->payload_offset + size_u);
585 
586  crm_trace("Decompressing message data %d bytes into %d bytes",
587  header->payload_compressed, size_u);
588 
589  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
590  remote->buffer + header->payload_offset,
591  header->payload_compressed, 1, 0);
592  rc = pcmk__bzlib2rc(rc);
593 
594  if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
595  crm_warn("Couldn't decompress v%d message, we only understand v%d",
596  header->version, REMOTE_MSG_VERSION);
597  free(uncompressed);
598  return NULL;
599 
600  } else if (rc != pcmk_rc_ok) {
601  crm_err("Decompression failed: %s " CRM_XS " rc=%d",
602  pcmk_rc_str(rc), rc);
603  free(uncompressed);
604  return NULL;
605  }
606 
607  pcmk__assert(size_u == header->payload_uncompressed);
608 
609  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
610  remote->buffer_size = header->payload_offset + size_u;
611 
612  free(remote->buffer);
613  remote->buffer = uncompressed;
614  header = localized_remote_header(remote);
615  }
616 
617  /* take ownership of the buffer */
618  remote->buffer_offset = 0;
619 
620  CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
621 
622  xml = pcmk__xml_parse(remote->buffer + header->payload_offset);
623  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
624  crm_warn("Couldn't parse v%d message, we only understand v%d",
625  header->version, REMOTE_MSG_VERSION);
626 
627  } else if (xml == NULL) {
628  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
629  }
630 
631  crm_log_xml_trace(xml, "[remote msg]");
632  return xml;
633 }
634 
635 static int
636 get_remote_socket(const pcmk__remote_t *remote)
637 {
638 #ifdef HAVE_GNUTLS_GNUTLS_H
639  if (remote->tls_session) {
640  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
641 
642  return GPOINTER_TO_INT(sock_ptr);
643  }
644 #endif
645 
646  if (remote->tcp_socket) {
647  return remote->tcp_socket;
648  }
649 
650  crm_err("Remote connection type undetermined (bug?)");
651  return -1;
652 }
653 
665 int
666 pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
667 {
668  struct pollfd fds = { 0, };
669  int sock = 0;
670  int rc = 0;
671  time_t start;
672  int timeout = timeout_ms;
673 
674  sock = get_remote_socket(remote);
675  if (sock <= 0) {
676  crm_trace("No longer connected");
677  return ENOTCONN;
678  }
679 
680  start = time(NULL);
681  errno = 0;
682  do {
683  fds.fd = sock;
684  fds.events = POLLIN;
685 
686  /* If we got an EINTR while polling, and we have a
687  * specific timeout we are trying to honor, attempt
688  * to adjust the timeout to the closest second. */
689  if (errno == EINTR && (timeout > 0)) {
690  timeout = timeout_ms - ((time(NULL) - start) * 1000);
691  if (timeout < 1000) {
692  timeout = 1000;
693  }
694  }
695 
696  rc = poll(&fds, 1, timeout);
697  } while (rc < 0 && errno == EINTR);
698 
699  if (rc < 0) {
700  return errno;
701  }
702  return (rc == 0)? ETIME : pcmk_rc_ok;
703 }
704 
717 int
719 {
720  int rc = pcmk_rc_ok;
721  size_t read_len = sizeof(struct remote_header_v0);
722  struct remote_header_v0 *header = localized_remote_header(remote);
723  bool received = false;
724  ssize_t read_rc;
725 
726  if(header) {
727  /* Stop at the end of the current message */
728  read_len = header->size_total;
729  }
730 
731  /* automatically grow the buffer when needed */
732  if(remote->buffer_size < read_len) {
733  remote->buffer_size = 2 * read_len;
734  crm_trace("Expanding buffer to %llu bytes",
735  (unsigned long long) remote->buffer_size);
736  remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
737  }
738 
739 #ifdef HAVE_GNUTLS_GNUTLS_H
740  if (!received && remote->tls_session) {
741  read_rc = gnutls_record_recv(*(remote->tls_session),
742  remote->buffer + remote->buffer_offset,
743  remote->buffer_size - remote->buffer_offset);
744  if (read_rc == GNUTLS_E_INTERRUPTED) {
745  rc = EINTR;
746  } else if (read_rc == GNUTLS_E_AGAIN) {
747  rc = EAGAIN;
748  } else if (read_rc < 0) {
749  crm_debug("TLS receive failed: %s (%lld)",
750  gnutls_strerror(read_rc), (long long) read_rc);
751  rc = EIO;
752  }
753  received = true;
754  }
755 #endif
756 
757  if (!received && remote->tcp_socket) {
758  read_rc = read(remote->tcp_socket,
759  remote->buffer + remote->buffer_offset,
760  remote->buffer_size - remote->buffer_offset);
761  if (read_rc < 0) {
762  rc = errno;
763  }
764  received = true;
765  }
766 
767  if (!received) {
768  crm_err("Remote connection type undetermined (bug?)");
769  return ESOCKTNOSUPPORT;
770  }
771 
772  /* process any errors. */
773  if (read_rc > 0) {
774  remote->buffer_offset += read_rc;
775  /* always null terminate buffer, the +1 to alloc always allows for this. */
776  remote->buffer[remote->buffer_offset] = '\0';
777  crm_trace("Received %lld more bytes (%llu total)",
778  (long long) read_rc,
779  (unsigned long long) remote->buffer_offset);
780 
781  } else if ((rc == EINTR) || (rc == EAGAIN)) {
782  crm_trace("No data available for non-blocking remote read: %s (%d)",
783  pcmk_rc_str(rc), rc);
784 
785  } else if (read_rc == 0) {
786  crm_debug("End of remote data encountered after %llu bytes",
787  (unsigned long long) remote->buffer_offset);
788  return ENOTCONN;
789 
790  } else {
791  crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
792  (unsigned long long) remote->buffer_offset,
793  pcmk_rc_str(rc), rc);
794  return ENOTCONN;
795  }
796 
797  header = localized_remote_header(remote);
798  if(header) {
799  if(remote->buffer_offset < header->size_total) {
800  crm_trace("Read partial remote message (%llu of %u bytes)",
801  (unsigned long long) remote->buffer_offset,
802  header->size_total);
803  } else {
804  crm_trace("Read full remote message of %llu bytes",
805  (unsigned long long) remote->buffer_offset);
806  return pcmk_rc_ok;
807  }
808  }
809 
810  return EAGAIN;
811 }
812 
823 int
825 {
826  int rc = pcmk_rc_ok;
827  time_t start = time(NULL);
828  int remaining_timeout = 0;
829 
830  if (timeout_ms == 0) {
831  timeout_ms = 10000;
832  } else if (timeout_ms < 0) {
833  timeout_ms = 60000;
834  }
835 
836  remaining_timeout = timeout_ms;
837  while (remaining_timeout > 0) {
838 
839  crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
840  remaining_timeout, timeout_ms);
841  rc = pcmk__remote_ready(remote, remaining_timeout);
842 
843  if (rc == ETIME) {
844  crm_err("Timed out (%d ms) while waiting for remote data",
845  remaining_timeout);
846  return rc;
847 
848  } else if (rc != pcmk_rc_ok) {
849  crm_debug("Wait for remote data aborted (will retry): %s "
850  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
851 
852  } else {
854  if (rc == pcmk_rc_ok) {
855  return rc;
856  } else if (rc == EAGAIN) {
857  crm_trace("Waiting for more remote data");
858  } else {
859  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
860  pcmk_rc_str(rc), rc);
861  }
862  }
863 
864  // Don't waste time retrying after fatal errors
865  if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
866  return rc;
867  }
868 
869  remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
870  }
871  return ETIME;
872 }
873 
874 struct tcp_async_cb_data {
875  int sock;
876  int timeout_ms;
877  time_t start;
878  void *userdata;
879  void (*callback) (void *userdata, int rc, int sock);
880 };
881 
882 // \return TRUE if timer should be rescheduled, FALSE otherwise
883 static gboolean
884 check_connect_finished(gpointer userdata)
885 {
886  struct tcp_async_cb_data *cb_data = userdata;
887  int rc;
888 
889  fd_set rset, wset;
890  struct timeval ts = { 0, };
891 
892  if (cb_data->start == 0) {
893  // Last connect() returned success immediately
894  rc = pcmk_rc_ok;
895  goto dispatch_done;
896  }
897 
898  // If the socket is ready for reading or writing, the connect succeeded
899  FD_ZERO(&rset);
900  FD_SET(cb_data->sock, &rset);
901  wset = rset;
902  rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
903 
904  if (rc < 0) { // select() error
905  rc = errno;
906  if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
907  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
908  return TRUE; // There is time left, so reschedule timer
909  } else {
910  rc = ETIMEDOUT;
911  }
912  }
913  crm_trace("Could not check socket %d for connection success: %s (%d)",
914  cb_data->sock, pcmk_rc_str(rc), rc);
915 
916  } else if (rc == 0) { // select() timeout
917  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
918  return TRUE; // There is time left, so reschedule timer
919  }
920  crm_debug("Timed out while waiting for socket %d connection success",
921  cb_data->sock);
922  rc = ETIMEDOUT;
923 
924  // select() returned number of file descriptors that are ready
925 
926  } else if (FD_ISSET(cb_data->sock, &rset)
927  || FD_ISSET(cb_data->sock, &wset)) {
928 
929  // The socket is ready; check it for connection errors
930  int error = 0;
931  socklen_t len = sizeof(error);
932 
933  if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
934  rc = errno;
935  crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
936  cb_data->sock, pcmk_rc_str(rc), rc);
937  } else if (error != 0) {
938  rc = error;
939  crm_trace("Socket %d connected with error: %s (%d)",
940  cb_data->sock, pcmk_rc_str(rc), rc);
941  } else {
942  rc = pcmk_rc_ok;
943  }
944 
945  } else { // Should not be possible
946  crm_trace("select() succeeded, but socket %d not in resulting "
947  "read/write sets", cb_data->sock);
948  rc = EAGAIN;
949  }
950 
951  dispatch_done:
952  if (rc == pcmk_rc_ok) {
953  crm_trace("Socket %d is connected", cb_data->sock);
954  } else {
955  close(cb_data->sock);
956  cb_data->sock = -1;
957  }
958 
959  if (cb_data->callback) {
960  cb_data->callback(cb_data->userdata, rc, cb_data->sock);
961  }
962  free(cb_data);
963  return FALSE; // Do not reschedule timer
964 }
965 
984 static int
985 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
986  int timeout_ms, int *timer_id, void *userdata,
987  void (*callback) (void *userdata, int rc, int sock))
988 {
989  int rc = 0;
990  int interval = 500;
991  int timer;
992  struct tcp_async_cb_data *cb_data = NULL;
993 
994  rc = pcmk__set_nonblocking(sock);
995  if (rc != pcmk_rc_ok) {
996  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
997  pcmk_rc_str(rc), rc);
998  return rc;
999  }
1000 
1001  rc = connect(sock, addr, addrlen);
1002  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
1003  rc = errno;
1004  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1005  pcmk_rc_str(rc), rc);
1006  return rc;
1007  }
1008 
1009  cb_data = pcmk__assert_alloc(1, sizeof(struct tcp_async_cb_data));
1010  cb_data->userdata = userdata;
1011  cb_data->callback = callback;
1012  cb_data->sock = sock;
1013  cb_data->timeout_ms = timeout_ms;
1014 
1015  if (rc == 0) {
1016  /* The connect was successful immediately, we still return to mainloop
1017  * and let this callback get called later. This avoids the user of this api
1018  * to have to account for the fact the callback could be invoked within this
1019  * function before returning. */
1020  cb_data->start = 0;
1021  interval = 1;
1022  } else {
1023  cb_data->start = time(NULL);
1024  }
1025 
1026  /* This timer function does a non-blocking poll on the socket to see if we
1027  * can use it. Once we can, the connect has completed. This method allows us
1028  * to connect without blocking the mainloop.
1029  *
1030  * @TODO Use a mainloop fd callback for this instead of polling. Something
1031  * about the way mainloop is currently polling prevents this from
1032  * working at the moment though. (See connect(2) regarding EINPROGRESS
1033  * for possible new handling needed.)
1034  */
1035  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1036  interval, sock);
1037  timer = g_timeout_add(interval, check_connect_finished, cb_data);
1038  if (timer_id) {
1039  *timer_id = timer;
1040  }
1041 
1042  // timer callback should be taking care of cb_data
1043  // cppcheck-suppress memleak
1044  return pcmk_rc_ok;
1045 }
1046 
1057 static int
1058 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
1059 {
1060  int rc = connect(sock, addr, addrlen);
1061 
1062  if (rc < 0) {
1063  rc = errno;
1064  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1065  pcmk_rc_str(rc), rc);
1066  return rc;
1067  }
1068 
1069  rc = pcmk__set_nonblocking(sock);
1070  if (rc != pcmk_rc_ok) {
1071  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1072  pcmk_rc_str(rc), rc);
1073  return rc;
1074  }
1075 
1076  return pcmk_ok;
1077 }
1078 
1095 int
1096 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
1097  int *sock_fd, void *userdata,
1098  void (*callback) (void *userdata, int rc, int sock))
1099 {
1100  char buffer[INET6_ADDRSTRLEN];
1101  struct addrinfo *res = NULL;
1102  struct addrinfo *rp = NULL;
1103  struct addrinfo hints;
1104  const char *server = host;
1105  int rc;
1106  int sock = -1;
1107 
1108  CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
1109 
1110  // Get host's IP address(es)
1111  memset(&hints, 0, sizeof(struct addrinfo));
1112  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1113  hints.ai_socktype = SOCK_STREAM;
1114  hints.ai_flags = AI_CANONNAME;
1115 
1116  rc = getaddrinfo(server, NULL, &hints, &res);
1117  rc = pcmk__gaierror2rc(rc);
1118 
1119  if (rc != pcmk_rc_ok) {
1120  crm_err("Unable to get IP address info for %s: %s",
1121  server, pcmk_rc_str(rc));
1122  goto async_cleanup;
1123  }
1124 
1125  if (!res || !res->ai_addr) {
1126  crm_err("Unable to get IP address info for %s: no result", server);
1127  rc = ENOTCONN;
1128  goto async_cleanup;
1129  }
1130 
1131  // getaddrinfo() returns a list of host's addresses, try them in order
1132  for (rp = res; rp != NULL; rp = rp->ai_next) {
1133  struct sockaddr *addr = rp->ai_addr;
1134 
1135  if (!addr) {
1136  continue;
1137  }
1138 
1139  if (rp->ai_canonname) {
1140  server = res->ai_canonname;
1141  }
1142  crm_debug("Got canonical name %s for %s", server, host);
1143 
1144  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1145  if (sock == -1) {
1146  rc = errno;
1147  crm_warn("Could not create socket for remote connection to %s:%d: "
1148  "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
1149  continue;
1150  }
1151 
1152  /* Set port appropriately for address family */
1153  /* (void*) casts avoid false-positive compiler alignment warnings */
1154  if (addr->sa_family == AF_INET6) {
1155  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1156  } else {
1157  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1158  }
1159 
1160  memset(buffer, 0, PCMK__NELEM(buffer));
1161  pcmk__sockaddr2str(addr, buffer);
1162  crm_info("Attempting remote connection to %s:%d", buffer, port);
1163 
1164  if (callback) {
1165  if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
1166  timer_id, userdata, callback) == pcmk_rc_ok) {
1167  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1168  }
1169 
1170  } else if (connect_socket_once(sock, rp->ai_addr,
1171  rp->ai_addrlen) == pcmk_rc_ok) {
1172  break; /* Success */
1173  }
1174 
1175  // Connect failed
1176  close(sock);
1177  sock = -1;
1178  rc = ENOTCONN;
1179  }
1180 
1181 async_cleanup:
1182 
1183  if (res) {
1184  freeaddrinfo(res);
1185  }
1186  *sock_fd = sock;
1187  return rc;
1188 }
1189 
1201 void
1202 pcmk__sockaddr2str(const void *sa, char *s)
1203 {
1204  switch (((const struct sockaddr *) sa)->sa_family) {
1205  case AF_INET:
1206  inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
1207  s, INET6_ADDRSTRLEN);
1208  break;
1209 
1210  case AF_INET6:
1211  inet_ntop(AF_INET6,
1212  &(((const struct sockaddr_in6 *) sa)->sin6_addr),
1213  s, INET6_ADDRSTRLEN);
1214  break;
1215 
1216  default:
1217  strcpy(s, "<invalid>");
1218  }
1219 }
1220 
1230 int
1231 pcmk__accept_remote_connection(int ssock, int *csock)
1232 {
1233  int rc;
1234  struct sockaddr_storage addr;
1235  socklen_t laddr = sizeof(addr);
1236  char addr_str[INET6_ADDRSTRLEN];
1237 #ifdef TCP_USER_TIMEOUT
1238  long sbd_timeout = 0;
1239 #endif
1240 
1241  /* accept the connection */
1242  memset(&addr, 0, sizeof(addr));
1243  *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1244  if (*csock == -1) {
1245  rc = errno;
1246  crm_err("Could not accept remote client connection: %s "
1247  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
1248  return rc;
1249  }
1250  pcmk__sockaddr2str(&addr, addr_str);
1251  crm_info("Accepted new remote client connection from %s", addr_str);
1252 
1253  rc = pcmk__set_nonblocking(*csock);
1254  if (rc != pcmk_rc_ok) {
1255  crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1256  pcmk_rc_str(rc), rc);
1257  close(*csock);
1258  *csock = -1;
1259  return rc;
1260  }
1261 
1262 #ifdef TCP_USER_TIMEOUT
1263  sbd_timeout = pcmk__get_sbd_watchdog_timeout();
1264  if (sbd_timeout > 0) {
1265  // Time to fail and retry before watchdog
1266  long half = sbd_timeout / 2;
1267  unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
1268 
1269  rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1270  &optval, sizeof(optval));
1271  if (rc < 0) {
1272  rc = errno;
1273  crm_err("Could not set TCP timeout to %d ms on remote connection: "
1274  "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
1275  close(*csock);
1276  *csock = -1;
1277  return rc;
1278  }
1279  }
1280 #endif
1281 
1282  return rc;
1283 }
1284 
1290 int
1292 {
1293  static int port = 0;
1294 
1295  if (port == 0) {
1296  const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
1297 
1298  if (env) {
1299  errno = 0;
1300  port = strtol(env, NULL, 10);
1301  if (errno || (port < 1) || (port > 65535)) {
1302  crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
1303  " has invalid value '%s', using %d instead",
1304  env, DEFAULT_REMOTE_PORT);
1305  port = DEFAULT_REMOTE_PORT;
1306  }
1307  } else {
1308  port = DEFAULT_REMOTE_PORT;
1309  }
1310  }
1311  return port;
1312 }
pcmk__cpg_host_t host
Definition: cpg.c:52
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:245
A dumping ground.
int pcmk__set_nonblocking(int fd)
Definition: io.c:524
#define ETIME
Definition: portability.h:111
#define PCMK__ENV_DH_MIN_BITS
uint32_t payload_compressed
Definition: remote.c:218
int pcmk__scan_min_int(const char *text, int *result, int minimum)
Definition: strings.c:126
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition: xml_io.c:490
size_t buffer_offset
Definition: ipc_internal.h:114
#define PCMK__ENV_REMOTE_PORT
uint32_t payload_uncompressed
Definition: remote.c:219
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:228
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
Definition: remote.c:1291
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition: results.c:503
#define ENDIAN_LOCAL
Definition: remote.c:70
const char * pcmk__env_option(const char *option)
Definition: options.c:1094
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:212
void pcmk__sockaddr2str(const void *sa, char *s)
Definition: remote.c:1202
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:67
#define PCMK_GNUTLS_PRIORITIES
Definition: config.h:541
#define PCMK__ENV_TLS_PRIORITIES
#define crm_warn(fmt, args...)
Definition: logging.h:394
#define crm_debug(fmt, args...)
Definition: logging.h:402
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:666
#define crm_trace(fmt, args...)
Definition: logging.h:404
uint64_t id
Definition: remote.c:214
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define __swab64(x)
Definition: remote.c:58
size_t buffer_size
Definition: ipc_internal.h:113
Wrappers for and extensions to libxml2.
#define PCMK__NELEM(a)
Definition: internal.h:48
int pcmk__read_available_remote_data(pcmk__remote_t *remote)
Definition: remote.c:718
long pcmk__get_sbd_watchdog_timeout(void)
Definition: watchdog.c:243
uint32_t payload_offset
Definition: remote.c:217
xmlNode * pcmk__xml_parse(const char *input)
Definition: xml_io.c:245
struct tcp_async_cb_data __attribute__
#define pcmk__assert(expr)
uint32_t size_total
Definition: remote.c:216
#define CRM_XS
Definition: logging.h:56
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition: results.c:908
#define __swab32(x)
Definition: remote.c:52
#define REMOTE_MSG_VERSION
Definition: remote.c:69
int pcmk__gaierror2rc(int gai)
Map a getaddrinfo() return code to the most similar Pacemaker return code.
Definition: results.c:867
#define crm_err(fmt, args...)
Definition: logging.h:391
int pcmk__accept_remote_connection(int ssock, int *csock)
Definition: remote.c:1231
#define pcmk_ok
Definition: results.h:65
#define crm_log_xml_trace(xml, text)
Definition: logging.h:412
#define PCMK__ENV_DH_MAX_BITS
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
Definition: remote.c:1096
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:824
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
Definition: remote.c:570
struct pcmk__remote_s * remote
Definition: ipc_internal.h:192
#define pcmk__assert_alloc(nmemb, size)
Definition: internal.h:297
unsigned int timeout
Definition: pcmk_fence.c:32
#define crm_info(fmt, args...)
Definition: logging.h:399
int pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
Definition: remote.c:517
uint32_t version
Definition: remote.c:213
uint64_t flags
Definition: remote.c:215