pacemaker  2.1.8-3980678f03
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2024 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <crm/crm.h>
12 
13 #include <sys/param.h>
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #include <sys/socket.h>
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/ip.h>
22 #include <netinet/tcp.h>
23 #include <netdb.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <inttypes.h> // PRIx32
27 
28 #include <glib.h>
29 #include <bzlib.h>
30 
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
35 
36 #ifdef HAVE_GNUTLS_GNUTLS_H
37 # include <gnutls/gnutls.h>
38 #endif
39 
40 /* Swab macros from linux/swab.h */
41 #ifdef HAVE_LINUX_SWAB_H
42 # include <linux/swab.h>
43 #else
44 /*
45  * casts are necessary for constants, because we never know how for sure
46  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
47  */
48 #define __swab16(x) ((uint16_t)( \
49  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
50  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51 
52 #define __swab32(x) ((uint32_t)( \
53  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
54  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
55  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
56  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57 
58 #define __swab64(x) ((uint64_t)( \
59  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
60  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
61  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
62  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
63  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
64  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
65  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
66  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
67 #endif
68 
69 #define REMOTE_MSG_VERSION 1
70 #define ENDIAN_LOCAL 0xBADADBBD
71 
72 struct remote_header_v0 {
73  uint32_t endian; /* Detect messages from hosts with different endian-ness */
74  uint32_t version;
75  uint64_t id;
76  uint64_t flags;
77  uint32_t size_total;
78  uint32_t payload_offset;
79  uint32_t payload_compressed;
80  uint32_t payload_uncompressed;
81 
82  /* New fields get added here */
83 
84 } __attribute__ ((packed));
85 
97 static struct remote_header_v0 *
98 localized_remote_header(pcmk__remote_t *remote)
99 {
100  struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
101  if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
102  return NULL;
103 
104  } else if(header->endian != ENDIAN_LOCAL) {
105  uint32_t endian = __swab32(header->endian);
106 
108  if(endian != ENDIAN_LOCAL) {
109  crm_err("Invalid message detected, endian mismatch: %" PRIx32
110  " is neither %" PRIx32 " nor the swab'd %" PRIx32,
111  ENDIAN_LOCAL, header->endian, endian);
112  return NULL;
113  }
114 
115  header->id = __swab64(header->id);
116  header->flags = __swab64(header->flags);
117  header->endian = __swab32(header->endian);
118 
119  header->version = __swab32(header->version);
120  header->size_total = __swab32(header->size_total);
121  header->payload_offset = __swab32(header->payload_offset);
122  header->payload_compressed = __swab32(header->payload_compressed);
123  header->payload_uncompressed = __swab32(header->payload_uncompressed);
124  }
125 
126  return header;
127 }
128 
129 #ifdef HAVE_GNUTLS_GNUTLS_H
130 
131 int
132 pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_sec,
133  int *gnutls_rc)
134 {
135  const time_t time_limit = time(NULL) + timeout_sec;
136 
137  if (gnutls_rc != NULL) {
138  *gnutls_rc = GNUTLS_E_SUCCESS;
139  }
140  do {
141  int rc = gnutls_handshake(*remote->tls_session);
142 
143  switch (rc) {
144  case GNUTLS_E_SUCCESS:
145  return pcmk_rc_ok;
146 
147  case GNUTLS_E_INTERRUPTED:
148  case GNUTLS_E_AGAIN:
149  rc = pcmk__remote_ready(remote, 1000);
150  if ((rc != pcmk_rc_ok) && (rc != ETIME)) { // Fatal error
151  return rc;
152  }
153  break;
154 
155  default:
156  if (gnutls_rc != NULL) {
157  *gnutls_rc = rc;
158  }
159  return EPROTO;
160  }
161  } while (time(NULL) < time_limit);
162  return ETIME;
163 }
164 
171 static void
172 set_minimum_dh_bits(const gnutls_session_t *session)
173 {
174  int dh_min_bits;
175 
177  0);
178 
179  /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
180  * the priority string imply the DH requirements, but this is the only
181  * way to give the user control over compatibility with older servers.
182  */
183  if (dh_min_bits > 0) {
184  crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
185  dh_min_bits);
186  crm_warn("Support for the " PCMK__ENV_DH_MIN_BITS " "
187  "environment variable is deprecated and will be removed "
188  "in a future release");
189  gnutls_dh_set_prime_bits(*session, dh_min_bits);
190  }
191 }
192 
193 static unsigned int
194 get_bound_dh_bits(unsigned int dh_bits)
195 {
196  int dh_min_bits;
197  int dh_max_bits;
198 
200  0);
202  0);
203 
204  if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) {
205  crm_warn("Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits");
206  dh_max_bits = 0;
207  }
208  if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
209  return dh_min_bits;
210  }
211  if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
212  return dh_max_bits;
213  }
214  return dh_bits;
215 }
228 gnutls_session_t *
229 pcmk__new_tls_session(int csock, unsigned int conn_type,
230  gnutls_credentials_type_t cred_type, void *credentials)
231 {
232  int rc = GNUTLS_E_SUCCESS;
233  const char *prio_base = NULL;
234  char *prio = NULL;
235  gnutls_session_t *session = NULL;
236 
237  /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
238  * values required for its functionality.
239  *
240  * For an example of anonymous authentication, see:
241  * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
242  */
243 
245  if (prio_base == NULL) {
246  prio_base = PCMK_GNUTLS_PRIORITIES;
247  }
248  prio = crm_strdup_printf("%s:%s", prio_base,
249  (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
250 
251  session = gnutls_malloc(sizeof(gnutls_session_t));
252  if (session == NULL) {
253  rc = GNUTLS_E_MEMORY_ERROR;
254  goto error;
255  }
256 
257  rc = gnutls_init(session, conn_type);
258  if (rc != GNUTLS_E_SUCCESS) {
259  goto error;
260  }
261 
262  /* @TODO On the server side, it would be more efficient to cache the
263  * priority with gnutls_priority_init2() and set it with
264  * gnutls_priority_set() for all sessions.
265  */
266  rc = gnutls_priority_set_direct(*session, prio, NULL);
267  if (rc != GNUTLS_E_SUCCESS) {
268  goto error;
269  }
270  if (conn_type == GNUTLS_CLIENT) {
271  set_minimum_dh_bits(session);
272  }
273 
274  gnutls_transport_set_ptr(*session,
275  (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
276 
277  rc = gnutls_credentials_set(*session, cred_type, credentials);
278  if (rc != GNUTLS_E_SUCCESS) {
279  goto error;
280  }
281  free(prio);
282  return session;
283 
284 error:
285  crm_err("Could not initialize %s TLS %s session: %s "
286  CRM_XS " rc=%d priority='%s'",
287  (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
288  (conn_type == GNUTLS_SERVER)? "server" : "client",
289  gnutls_strerror(rc), rc, prio);
290  free(prio);
291  if (session != NULL) {
292  gnutls_free(session);
293  }
294  return NULL;
295 }
296 
312 int
313 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
314 {
315  int rc = GNUTLS_E_SUCCESS;
316  unsigned int dh_bits = 0;
317 
318  rc = gnutls_dh_params_init(dh_params);
319  if (rc != GNUTLS_E_SUCCESS) {
320  goto error;
321  }
322 
323  dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
324  GNUTLS_SEC_PARAM_NORMAL);
325  if (dh_bits == 0) {
326  rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
327  goto error;
328  }
329  dh_bits = get_bound_dh_bits(dh_bits);
330 
331  crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
332  dh_bits);
333  rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
334  if (rc != GNUTLS_E_SUCCESS) {
335  goto error;
336  }
337 
338  return pcmk_rc_ok;
339 
340 error:
341  crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
342  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
343  return EPROTO;
344 }
345 
357 int
358 pcmk__read_handshake_data(const pcmk__client_t *client)
359 {
360  int rc = 0;
361 
362  CRM_ASSERT(client && client->remote && client->remote->tls_session);
363 
364  do {
365  rc = gnutls_handshake(*client->remote->tls_session);
366  } while (rc == GNUTLS_E_INTERRUPTED);
367 
368  if (rc == GNUTLS_E_AGAIN) {
369  /* No more data is available at the moment. This function should be
370  * invoked again once the client sends more.
371  */
372  return EAGAIN;
373  } else if (rc != GNUTLS_E_SUCCESS) {
374  crm_err("TLS handshake with remote client failed: %s "
375  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
376  return EPROTO;
377  }
378  return pcmk_rc_ok;
379 }
380 
381 // \return Standard Pacemaker return code
382 static int
383 send_tls(gnutls_session_t *session, struct iovec *iov)
384 {
385  const char *unsent = iov->iov_base;
386  size_t unsent_len = iov->iov_len;
387  ssize_t gnutls_rc;
388 
389  if (unsent == NULL) {
390  return EINVAL;
391  }
392 
393  crm_trace("Sending TLS message of %llu bytes",
394  (unsigned long long) unsent_len);
395  while (true) {
396  gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
397 
398  if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
399  crm_trace("Retrying to send %llu bytes remaining",
400  (unsigned long long) unsent_len);
401 
402  } else if (gnutls_rc < 0) {
403  // Caller can log as error if necessary
404  crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
405  gnutls_strerror((int) gnutls_rc),
406  (long long) gnutls_rc);
407  return ECONNABORTED;
408 
409  } else if (gnutls_rc < unsent_len) {
410  crm_trace("Sent %lld of %llu bytes remaining",
411  (long long) gnutls_rc, (unsigned long long) unsent_len);
412  unsent_len -= gnutls_rc;
413  unsent += gnutls_rc;
414  } else {
415  crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
416  break;
417  }
418  }
419  return pcmk_rc_ok;
420 }
421 #endif
422 
423 // \return Standard Pacemaker return code
424 static int
425 send_plaintext(int sock, struct iovec *iov)
426 {
427  const char *unsent = iov->iov_base;
428  size_t unsent_len = iov->iov_len;
429  ssize_t write_rc;
430 
431  if (unsent == NULL) {
432  return EINVAL;
433  }
434 
435  crm_debug("Sending plaintext message of %llu bytes to socket %d",
436  (unsigned long long) unsent_len, sock);
437  while (true) {
438  write_rc = write(sock, unsent, unsent_len);
439  if (write_rc < 0) {
440  int rc = errno;
441 
442  if ((errno == EINTR) || (errno == EAGAIN)) {
443  crm_trace("Retrying to send %llu bytes remaining to socket %d",
444  (unsigned long long) unsent_len, sock);
445  continue;
446  }
447 
448  // Caller can log as error if necessary
449  crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
450  pcmk_rc_str(rc), rc, sock);
451  return rc;
452 
453  } else if (write_rc < unsent_len) {
454  crm_trace("Sent %lld of %llu bytes remaining",
455  (long long) write_rc, (unsigned long long) unsent_len);
456  unsent += write_rc;
457  unsent_len -= write_rc;
458  continue;
459 
460  } else {
461  crm_trace("Sent all %lld bytes remaining: %.100s",
462  (long long) write_rc, (char *) (iov->iov_base));
463  break;
464  }
465  }
466  return pcmk_rc_ok;
467 }
468 
469 // \return Standard Pacemaker return code
470 static int
471 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
472 {
473  int rc = pcmk_rc_ok;
474 
475  for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
476 #ifdef HAVE_GNUTLS_GNUTLS_H
477  if (remote->tls_session) {
478  rc = send_tls(remote->tls_session, &(iov[lpc]));
479  continue;
480  }
481 #endif
482  if (remote->tcp_socket) {
483  rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
484  } else {
485  rc = ESOCKTNOSUPPORT;
486  }
487  }
488  return rc;
489 }
490 
500 int
501 pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
502 {
503  int rc = pcmk_rc_ok;
504  static uint64_t id = 0;
505  GString *xml_text = NULL;
506 
507  struct iovec iov[2];
508  struct remote_header_v0 *header;
509 
510  CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
511 
512  xml_text = g_string_sized_new(1024);
513  pcmk__xml_string(msg, 0, xml_text, 0);
514  CRM_CHECK(xml_text->len > 0,
515  g_string_free(xml_text, TRUE); return EINVAL);
516 
517  header = pcmk__assert_alloc(1, sizeof(struct remote_header_v0));
518 
519  iov[0].iov_base = header;
520  iov[0].iov_len = sizeof(struct remote_header_v0);
521 
522  iov[1].iov_len = 1 + xml_text->len;
523  iov[1].iov_base = g_string_free(xml_text, FALSE);
524 
525  id++;
526  header->id = id;
527  header->endian = ENDIAN_LOCAL;
528  header->version = REMOTE_MSG_VERSION;
529  header->payload_offset = iov[0].iov_len;
530  header->payload_uncompressed = iov[1].iov_len;
531  header->size_total = iov[0].iov_len + iov[1].iov_len;
532 
533  rc = remote_send_iovs(remote, iov, 2);
534  if (rc != pcmk_rc_ok) {
535  crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
536  pcmk_rc_str(rc), rc);
537  }
538 
539  free(iov[0].iov_base);
540  g_free((gchar *) iov[1].iov_base);
541  return rc;
542 }
543 
553 xmlNode *
555 {
556  xmlNode *xml = NULL;
557  struct remote_header_v0 *header = localized_remote_header(remote);
558 
559  if (header == NULL) {
560  return NULL;
561  }
562 
563  /* Support compression on the receiving end now, in case we ever want to add it later */
564  if (header->payload_compressed) {
565  int rc = 0;
566  unsigned int size_u = 1 + header->payload_uncompressed;
567  char *uncompressed =
568  pcmk__assert_alloc(1, header->payload_offset + size_u);
569 
570  crm_trace("Decompressing message data %d bytes into %d bytes",
571  header->payload_compressed, size_u);
572 
573  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
574  remote->buffer + header->payload_offset,
575  header->payload_compressed, 1, 0);
576  rc = pcmk__bzlib2rc(rc);
577 
578  if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
579  crm_warn("Couldn't decompress v%d message, we only understand v%d",
580  header->version, REMOTE_MSG_VERSION);
581  free(uncompressed);
582  return NULL;
583 
584  } else if (rc != pcmk_rc_ok) {
585  crm_err("Decompression failed: %s " CRM_XS " rc=%d",
586  pcmk_rc_str(rc), rc);
587  free(uncompressed);
588  return NULL;
589  }
590 
591  CRM_ASSERT(size_u == header->payload_uncompressed);
592 
593  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
594  remote->buffer_size = header->payload_offset + size_u;
595 
596  free(remote->buffer);
597  remote->buffer = uncompressed;
598  header = localized_remote_header(remote);
599  }
600 
601  /* take ownership of the buffer */
602  remote->buffer_offset = 0;
603 
604  CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
605 
606  xml = pcmk__xml_parse(remote->buffer + header->payload_offset);
607  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
608  crm_warn("Couldn't parse v%d message, we only understand v%d",
609  header->version, REMOTE_MSG_VERSION);
610 
611  } else if (xml == NULL) {
612  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
613  }
614 
615  return xml;
616 }
617 
618 static int
619 get_remote_socket(const pcmk__remote_t *remote)
620 {
621 #ifdef HAVE_GNUTLS_GNUTLS_H
622  if (remote->tls_session) {
623  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
624 
625  return GPOINTER_TO_INT(sock_ptr);
626  }
627 #endif
628 
629  if (remote->tcp_socket) {
630  return remote->tcp_socket;
631  }
632 
633  crm_err("Remote connection type undetermined (bug?)");
634  return -1;
635 }
636 
648 int
649 pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
650 {
651  struct pollfd fds = { 0, };
652  int sock = 0;
653  int rc = 0;
654  time_t start;
655  int timeout = timeout_ms;
656 
657  sock = get_remote_socket(remote);
658  if (sock <= 0) {
659  crm_trace("No longer connected");
660  return ENOTCONN;
661  }
662 
663  start = time(NULL);
664  errno = 0;
665  do {
666  fds.fd = sock;
667  fds.events = POLLIN;
668 
669  /* If we got an EINTR while polling, and we have a
670  * specific timeout we are trying to honor, attempt
671  * to adjust the timeout to the closest second. */
672  if (errno == EINTR && (timeout > 0)) {
673  timeout = timeout_ms - ((time(NULL) - start) * 1000);
674  if (timeout < 1000) {
675  timeout = 1000;
676  }
677  }
678 
679  rc = poll(&fds, 1, timeout);
680  } while (rc < 0 && errno == EINTR);
681 
682  if (rc < 0) {
683  return errno;
684  }
685  return (rc == 0)? ETIME : pcmk_rc_ok;
686 }
687 
700 static int
701 read_available_remote_data(pcmk__remote_t *remote)
702 {
703  int rc = pcmk_rc_ok;
704  size_t read_len = sizeof(struct remote_header_v0);
705  struct remote_header_v0 *header = localized_remote_header(remote);
706  bool received = false;
707  ssize_t read_rc;
708 
709  if(header) {
710  /* Stop at the end of the current message */
711  read_len = header->size_total;
712  }
713 
714  /* automatically grow the buffer when needed */
715  if(remote->buffer_size < read_len) {
716  remote->buffer_size = 2 * read_len;
717  crm_trace("Expanding buffer to %llu bytes",
718  (unsigned long long) remote->buffer_size);
719  remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
720  }
721 
722 #ifdef HAVE_GNUTLS_GNUTLS_H
723  if (!received && remote->tls_session) {
724  read_rc = gnutls_record_recv(*(remote->tls_session),
725  remote->buffer + remote->buffer_offset,
726  remote->buffer_size - remote->buffer_offset);
727  if (read_rc == GNUTLS_E_INTERRUPTED) {
728  rc = EINTR;
729  } else if (read_rc == GNUTLS_E_AGAIN) {
730  rc = EAGAIN;
731  } else if (read_rc < 0) {
732  crm_debug("TLS receive failed: %s (%lld)",
733  gnutls_strerror(read_rc), (long long) read_rc);
734  rc = EIO;
735  }
736  received = true;
737  }
738 #endif
739 
740  if (!received && remote->tcp_socket) {
741  read_rc = read(remote->tcp_socket,
742  remote->buffer + remote->buffer_offset,
743  remote->buffer_size - remote->buffer_offset);
744  if (read_rc < 0) {
745  rc = errno;
746  }
747  received = true;
748  }
749 
750  if (!received) {
751  crm_err("Remote connection type undetermined (bug?)");
752  return ESOCKTNOSUPPORT;
753  }
754 
755  /* process any errors. */
756  if (read_rc > 0) {
757  remote->buffer_offset += read_rc;
758  /* always null terminate buffer, the +1 to alloc always allows for this. */
759  remote->buffer[remote->buffer_offset] = '\0';
760  crm_trace("Received %lld more bytes (%llu total)",
761  (long long) read_rc,
762  (unsigned long long) remote->buffer_offset);
763 
764  } else if ((rc == EINTR) || (rc == EAGAIN)) {
765  crm_trace("No data available for non-blocking remote read: %s (%d)",
766  pcmk_rc_str(rc), rc);
767 
768  } else if (read_rc == 0) {
769  crm_debug("End of remote data encountered after %llu bytes",
770  (unsigned long long) remote->buffer_offset);
771  return ENOTCONN;
772 
773  } else {
774  crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
775  (unsigned long long) remote->buffer_offset,
776  pcmk_rc_str(rc), rc);
777  return ENOTCONN;
778  }
779 
780  header = localized_remote_header(remote);
781  if(header) {
782  if(remote->buffer_offset < header->size_total) {
783  crm_trace("Read partial remote message (%llu of %u bytes)",
784  (unsigned long long) remote->buffer_offset,
785  header->size_total);
786  } else {
787  crm_trace("Read full remote message of %llu bytes",
788  (unsigned long long) remote->buffer_offset);
789  return pcmk_rc_ok;
790  }
791  }
792 
793  return EAGAIN;
794 }
795 
806 int
808 {
809  int rc = pcmk_rc_ok;
810  time_t start = time(NULL);
811  int remaining_timeout = 0;
812 
813  if (timeout_ms == 0) {
814  timeout_ms = 10000;
815  } else if (timeout_ms < 0) {
816  timeout_ms = 60000;
817  }
818 
819  remaining_timeout = timeout_ms;
820  while (remaining_timeout > 0) {
821 
822  crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
823  remaining_timeout, timeout_ms);
824  rc = pcmk__remote_ready(remote, remaining_timeout);
825 
826  if (rc == ETIME) {
827  crm_err("Timed out (%d ms) while waiting for remote data",
828  remaining_timeout);
829  return rc;
830 
831  } else if (rc != pcmk_rc_ok) {
832  crm_debug("Wait for remote data aborted (will retry): %s "
833  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
834 
835  } else {
836  rc = read_available_remote_data(remote);
837  if (rc == pcmk_rc_ok) {
838  return rc;
839  } else if (rc == EAGAIN) {
840  crm_trace("Waiting for more remote data");
841  } else {
842  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
843  pcmk_rc_str(rc), rc);
844  }
845  }
846 
847  // Don't waste time retrying after fatal errors
848  if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
849  return rc;
850  }
851 
852  remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
853  }
854  return ETIME;
855 }
856 
857 struct tcp_async_cb_data {
858  int sock;
859  int timeout_ms;
860  time_t start;
861  void *userdata;
862  void (*callback) (void *userdata, int rc, int sock);
863 };
864 
865 // \return TRUE if timer should be rescheduled, FALSE otherwise
866 static gboolean
867 check_connect_finished(gpointer userdata)
868 {
869  struct tcp_async_cb_data *cb_data = userdata;
870  int rc;
871 
872  fd_set rset, wset;
873  struct timeval ts = { 0, };
874 
875  if (cb_data->start == 0) {
876  // Last connect() returned success immediately
877  rc = pcmk_rc_ok;
878  goto dispatch_done;
879  }
880 
881  // If the socket is ready for reading or writing, the connect succeeded
882  FD_ZERO(&rset);
883  FD_SET(cb_data->sock, &rset);
884  wset = rset;
885  rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
886 
887  if (rc < 0) { // select() error
888  rc = errno;
889  if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
890  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
891  return TRUE; // There is time left, so reschedule timer
892  } else {
893  rc = ETIMEDOUT;
894  }
895  }
896  crm_trace("Could not check socket %d for connection success: %s (%d)",
897  cb_data->sock, pcmk_rc_str(rc), rc);
898 
899  } else if (rc == 0) { // select() timeout
900  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
901  return TRUE; // There is time left, so reschedule timer
902  }
903  crm_debug("Timed out while waiting for socket %d connection success",
904  cb_data->sock);
905  rc = ETIMEDOUT;
906 
907  // select() returned number of file descriptors that are ready
908 
909  } else if (FD_ISSET(cb_data->sock, &rset)
910  || FD_ISSET(cb_data->sock, &wset)) {
911 
912  // The socket is ready; check it for connection errors
913  int error = 0;
914  socklen_t len = sizeof(error);
915 
916  if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
917  rc = errno;
918  crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
919  cb_data->sock, pcmk_rc_str(rc), rc);
920  } else if (error != 0) {
921  rc = error;
922  crm_trace("Socket %d connected with error: %s (%d)",
923  cb_data->sock, pcmk_rc_str(rc), rc);
924  } else {
925  rc = pcmk_rc_ok;
926  }
927 
928  } else { // Should not be possible
929  crm_trace("select() succeeded, but socket %d not in resulting "
930  "read/write sets", cb_data->sock);
931  rc = EAGAIN;
932  }
933 
934  dispatch_done:
935  if (rc == pcmk_rc_ok) {
936  crm_trace("Socket %d is connected", cb_data->sock);
937  } else {
938  close(cb_data->sock);
939  cb_data->sock = -1;
940  }
941 
942  if (cb_data->callback) {
943  cb_data->callback(cb_data->userdata, rc, cb_data->sock);
944  }
945  free(cb_data);
946  return FALSE; // Do not reschedule timer
947 }
948 
967 static int
968 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
969  int timeout_ms, int *timer_id, void *userdata,
970  void (*callback) (void *userdata, int rc, int sock))
971 {
972  int rc = 0;
973  int interval = 500;
974  int timer;
975  struct tcp_async_cb_data *cb_data = NULL;
976 
977  rc = pcmk__set_nonblocking(sock);
978  if (rc != pcmk_rc_ok) {
979  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
980  pcmk_rc_str(rc), rc);
981  return rc;
982  }
983 
984  rc = connect(sock, addr, addrlen);
985  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
986  rc = errno;
987  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
988  pcmk_rc_str(rc), rc);
989  return rc;
990  }
991 
992  cb_data = pcmk__assert_alloc(1, sizeof(struct tcp_async_cb_data));
993  cb_data->userdata = userdata;
994  cb_data->callback = callback;
995  cb_data->sock = sock;
996  cb_data->timeout_ms = timeout_ms;
997 
998  if (rc == 0) {
999  /* The connect was successful immediately, we still return to mainloop
1000  * and let this callback get called later. This avoids the user of this api
1001  * to have to account for the fact the callback could be invoked within this
1002  * function before returning. */
1003  cb_data->start = 0;
1004  interval = 1;
1005  } else {
1006  cb_data->start = time(NULL);
1007  }
1008 
1009  /* This timer function does a non-blocking poll on the socket to see if we
1010  * can use it. Once we can, the connect has completed. This method allows us
1011  * to connect without blocking the mainloop.
1012  *
1013  * @TODO Use a mainloop fd callback for this instead of polling. Something
1014  * about the way mainloop is currently polling prevents this from
1015  * working at the moment though. (See connect(2) regarding EINPROGRESS
1016  * for possible new handling needed.)
1017  */
1018  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1019  interval, sock);
1020  timer = g_timeout_add(interval, check_connect_finished, cb_data);
1021  if (timer_id) {
1022  *timer_id = timer;
1023  }
1024 
1025  // timer callback should be taking care of cb_data
1026  // cppcheck-suppress memleak
1027  return pcmk_rc_ok;
1028 }
1029 
1040 static int
1041 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
1042 {
1043  int rc = connect(sock, addr, addrlen);
1044 
1045  if (rc < 0) {
1046  rc = errno;
1047  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1048  pcmk_rc_str(rc), rc);
1049  return rc;
1050  }
1051 
1052  rc = pcmk__set_nonblocking(sock);
1053  if (rc != pcmk_rc_ok) {
1054  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1055  pcmk_rc_str(rc), rc);
1056  return rc;
1057  }
1058 
1059  return pcmk_ok;
1060 }
1061 
1078 int
1079 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
1080  int *sock_fd, void *userdata,
1081  void (*callback) (void *userdata, int rc, int sock))
1082 {
1083  char buffer[INET6_ADDRSTRLEN];
1084  struct addrinfo *res = NULL;
1085  struct addrinfo *rp = NULL;
1086  struct addrinfo hints;
1087  const char *server = host;
1088  int rc;
1089  int sock = -1;
1090 
1091  CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
1092 
1093  // Get host's IP address(es)
1094  memset(&hints, 0, sizeof(struct addrinfo));
1095  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1096  hints.ai_socktype = SOCK_STREAM;
1097  hints.ai_flags = AI_CANONNAME;
1098 
1099  rc = getaddrinfo(server, NULL, &hints, &res);
1100  rc = pcmk__gaierror2rc(rc);
1101 
1102  if (rc != pcmk_rc_ok) {
1103  crm_err("Unable to get IP address info for %s: %s",
1104  server, pcmk_rc_str(rc));
1105  goto async_cleanup;
1106  }
1107 
1108  if (!res || !res->ai_addr) {
1109  crm_err("Unable to get IP address info for %s: no result", server);
1110  rc = ENOTCONN;
1111  goto async_cleanup;
1112  }
1113 
1114  // getaddrinfo() returns a list of host's addresses, try them in order
1115  for (rp = res; rp != NULL; rp = rp->ai_next) {
1116  struct sockaddr *addr = rp->ai_addr;
1117 
1118  if (!addr) {
1119  continue;
1120  }
1121 
1122  if (rp->ai_canonname) {
1123  server = res->ai_canonname;
1124  }
1125  crm_debug("Got canonical name %s for %s", server, host);
1126 
1127  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1128  if (sock == -1) {
1129  rc = errno;
1130  crm_warn("Could not create socket for remote connection to %s:%d: "
1131  "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
1132  continue;
1133  }
1134 
1135  /* Set port appropriately for address family */
1136  /* (void*) casts avoid false-positive compiler alignment warnings */
1137  if (addr->sa_family == AF_INET6) {
1138  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1139  } else {
1140  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1141  }
1142 
1143  memset(buffer, 0, PCMK__NELEM(buffer));
1144  pcmk__sockaddr2str(addr, buffer);
1145  crm_info("Attempting remote connection to %s:%d", buffer, port);
1146 
1147  if (callback) {
1148  if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
1149  timer_id, userdata, callback) == pcmk_rc_ok) {
1150  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1151  }
1152 
1153  } else if (connect_socket_once(sock, rp->ai_addr,
1154  rp->ai_addrlen) == pcmk_rc_ok) {
1155  break; /* Success */
1156  }
1157 
1158  // Connect failed
1159  close(sock);
1160  sock = -1;
1161  rc = ENOTCONN;
1162  }
1163 
1164 async_cleanup:
1165 
1166  if (res) {
1167  freeaddrinfo(res);
1168  }
1169  *sock_fd = sock;
1170  return rc;
1171 }
1172 
1184 void
1185 pcmk__sockaddr2str(const void *sa, char *s)
1186 {
1187  switch (((const struct sockaddr *) sa)->sa_family) {
1188  case AF_INET:
1189  inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
1190  s, INET6_ADDRSTRLEN);
1191  break;
1192 
1193  case AF_INET6:
1194  inet_ntop(AF_INET6,
1195  &(((const struct sockaddr_in6 *) sa)->sin6_addr),
1196  s, INET6_ADDRSTRLEN);
1197  break;
1198 
1199  default:
1200  strcpy(s, "<invalid>");
1201  }
1202 }
1203 
1213 int
1214 pcmk__accept_remote_connection(int ssock, int *csock)
1215 {
1216  int rc;
1217  struct sockaddr_storage addr;
1218  socklen_t laddr = sizeof(addr);
1219  char addr_str[INET6_ADDRSTRLEN];
1220 #ifdef TCP_USER_TIMEOUT
1221  long sbd_timeout = 0;
1222 #endif
1223 
1224  /* accept the connection */
1225  memset(&addr, 0, sizeof(addr));
1226  *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1227  if (*csock == -1) {
1228  rc = errno;
1229  crm_err("Could not accept remote client connection: %s "
1230  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
1231  return rc;
1232  }
1233  pcmk__sockaddr2str(&addr, addr_str);
1234  crm_info("Accepted new remote client connection from %s", addr_str);
1235 
1236  rc = pcmk__set_nonblocking(*csock);
1237  if (rc != pcmk_rc_ok) {
1238  crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1239  pcmk_rc_str(rc), rc);
1240  close(*csock);
1241  *csock = -1;
1242  return rc;
1243  }
1244 
1245 #ifdef TCP_USER_TIMEOUT
1246  sbd_timeout = pcmk__get_sbd_watchdog_timeout();
1247  if (sbd_timeout > 0) {
1248  // Time to fail and retry before watchdog
1249  long half = sbd_timeout / 2;
1250  unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
1251 
1252  rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1253  &optval, sizeof(optval));
1254  if (rc < 0) {
1255  rc = errno;
1256  crm_err("Could not set TCP timeout to %d ms on remote connection: "
1257  "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
1258  close(*csock);
1259  *csock = -1;
1260  return rc;
1261  }
1262  }
1263 #endif
1264 
1265  return rc;
1266 }
1267 
1273 int
1275 {
1276  static int port = 0;
1277 
1278  if (port == 0) {
1279  const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
1280 
1281  if (env) {
1282  errno = 0;
1283  port = strtol(env, NULL, 10);
1284  if (errno || (port < 1) || (port > 65535)) {
1285  crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
1286  " has invalid value '%s', using %d instead",
1287  env, DEFAULT_REMOTE_PORT);
1288  port = DEFAULT_REMOTE_PORT;
1289  }
1290  } else {
1291  port = DEFAULT_REMOTE_PORT;
1292  }
1293  }
1294  return port;
1295 }
pcmk__cpg_host_t host
Definition: cpg.c:52
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:245
A dumping ground.
int pcmk__set_nonblocking(int fd)
Definition: io.c:524
#define ETIME
Definition: portability.h:111
#define PCMK__ENV_DH_MIN_BITS
uint32_t payload_compressed
Definition: remote.c:218
int pcmk__scan_min_int(const char *text, int *result, int minimum)
Definition: strings.c:127
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition: xml_io.c:488
size_t buffer_offset
Definition: ipc_internal.h:114
#define PCMK__ENV_REMOTE_PORT
uint32_t payload_uncompressed
Definition: remote.c:219
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:228
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
Definition: remote.c:1274
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition: results.c:501
#define ENDIAN_LOCAL
Definition: remote.c:70
const char * pcmk__env_option(const char *option)
Definition: options.c:1088
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:212
void pcmk__sockaddr2str(const void *sa, char *s)
Definition: remote.c:1185
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:67
#define PCMK_GNUTLS_PRIORITIES
Definition: config.h:541
#define PCMK__ENV_TLS_PRIORITIES
#define crm_warn(fmt, args...)
Definition: logging.h:394
#define crm_debug(fmt, args...)
Definition: logging.h:402
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:649
#define crm_trace(fmt, args...)
Definition: logging.h:404
uint64_t id
Definition: remote.c:214
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define __swab64(x)
Definition: remote.c:58
size_t buffer_size
Definition: ipc_internal.h:113
Wrappers for and extensions to libxml2.
#define PCMK__NELEM(a)
Definition: internal.h:48
long pcmk__get_sbd_watchdog_timeout(void)
Definition: watchdog.c:243
uint32_t payload_offset
Definition: remote.c:217
xmlNode * pcmk__xml_parse(const char *input)
Definition: xml_io.c:244
struct tcp_async_cb_data __attribute__
uint32_t size_total
Definition: remote.c:216
#define CRM_XS
Definition: logging.h:56
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition: results.c:906
#define __swab32(x)
Definition: remote.c:52
#define REMOTE_MSG_VERSION
Definition: remote.c:69
int pcmk__gaierror2rc(int gai)
Map a getaddrinfo() return code to the most similar Pacemaker return code.
Definition: results.c:865
#define crm_err(fmt, args...)
Definition: logging.h:391
#define CRM_ASSERT(expr)
Definition: results.h:42
int pcmk__accept_remote_connection(int ssock, int *csock)
Definition: remote.c:1214
#define pcmk_ok
Definition: results.h:69
#define PCMK__ENV_DH_MAX_BITS
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
Definition: remote.c:1079
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:807
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
Definition: remote.c:554
struct pcmk__remote_s * remote
Definition: ipc_internal.h:192
#define pcmk__assert_alloc(nmemb, size)
Definition: internal.h:297
unsigned int timeout
Definition: pcmk_fence.c:32
#define crm_info(fmt, args...)
Definition: logging.h:399
int pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
Definition: remote.c:501
uint32_t version
Definition: remote.c:213
uint64_t flags
Definition: remote.c:215