pacemaker  2.1.7-0f7f88312f
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2023 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <crm/crm.h>
12 
13 #include <sys/param.h>
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #include <sys/socket.h>
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/ip.h>
22 #include <netinet/tcp.h>
23 #include <netdb.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <inttypes.h> // PRIx32
27 
28 #include <glib.h>
29 #include <bzlib.h>
30 
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
35 
36 #ifdef HAVE_GNUTLS_GNUTLS_H
37 # include <gnutls/gnutls.h>
38 #endif
39 
40 /* Swab macros from linux/swab.h */
41 #ifdef HAVE_LINUX_SWAB_H
42 # include <linux/swab.h>
43 #else
44 /*
45  * casts are necessary for constants, because we never know how for sure
46  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
47  */
48 #define __swab16(x) ((uint16_t)( \
49  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
50  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51 
52 #define __swab32(x) ((uint32_t)( \
53  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
54  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
55  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
56  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57 
58 #define __swab64(x) ((uint64_t)( \
59  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
60  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
61  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
62  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
63  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
64  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
65  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
66  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
67 #endif
68 
69 #define REMOTE_MSG_VERSION 1
70 #define ENDIAN_LOCAL 0xBADADBBD
71 
72 struct remote_header_v0 {
73  uint32_t endian; /* Detect messages from hosts with different endian-ness */
74  uint32_t version;
75  uint64_t id;
76  uint64_t flags;
77  uint32_t size_total;
78  uint32_t payload_offset;
79  uint32_t payload_compressed;
80  uint32_t payload_uncompressed;
81 
82  /* New fields get added here */
83 
84 } __attribute__ ((packed));
85 
97 static struct remote_header_v0 *
98 localized_remote_header(pcmk__remote_t *remote)
99 {
100  struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
101  if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
102  return NULL;
103 
104  } else if(header->endian != ENDIAN_LOCAL) {
105  uint32_t endian = __swab32(header->endian);
106 
108  if(endian != ENDIAN_LOCAL) {
109  crm_err("Invalid message detected, endian mismatch: %" PRIx32
110  " is neither %" PRIx32 " nor the swab'd %" PRIx32,
111  ENDIAN_LOCAL, header->endian, endian);
112  return NULL;
113  }
114 
115  header->id = __swab64(header->id);
116  header->flags = __swab64(header->flags);
117  header->endian = __swab32(header->endian);
118 
119  header->version = __swab32(header->version);
120  header->size_total = __swab32(header->size_total);
121  header->payload_offset = __swab32(header->payload_offset);
122  header->payload_compressed = __swab32(header->payload_compressed);
123  header->payload_uncompressed = __swab32(header->payload_uncompressed);
124  }
125 
126  return header;
127 }
128 
129 #ifdef HAVE_GNUTLS_GNUTLS_H
130 
131 int
132 pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_ms)
133 {
134  int rc = 0;
135  int pollrc = 0;
136  time_t time_limit = time(NULL) + timeout_ms / 1000;
137 
138  do {
139  rc = gnutls_handshake(*remote->tls_session);
140  if ((rc == GNUTLS_E_INTERRUPTED) || (rc == GNUTLS_E_AGAIN)) {
141  pollrc = pcmk__remote_ready(remote, 1000);
142  if ((pollrc != pcmk_rc_ok) && (pollrc != ETIME)) {
143  /* poll returned error, there is no hope */
144  crm_trace("TLS handshake poll failed: %s (%d)",
145  pcmk_strerror(pollrc), pollrc);
146  return pcmk_legacy2rc(pollrc);
147  }
148  } else if (rc < 0) {
149  crm_trace("TLS handshake failed: %s (%d)",
150  gnutls_strerror(rc), rc);
151  return EPROTO;
152  } else {
153  return pcmk_rc_ok;
154  }
155  } while (time(NULL) < time_limit);
156  return ETIME;
157 }
158 
165 static void
166 set_minimum_dh_bits(const gnutls_session_t *session)
167 {
168  int dh_min_bits;
169 
171  0);
172 
173  /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
174  * the priority string imply the DH requirements, but this is the only
175  * way to give the user control over compatibility with older servers.
176  */
177  if (dh_min_bits > 0) {
178  crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
179  dh_min_bits);
180  gnutls_dh_set_prime_bits(*session, dh_min_bits);
181  }
182 }
183 
184 static unsigned int
185 get_bound_dh_bits(unsigned int dh_bits)
186 {
187  int dh_min_bits;
188  int dh_max_bits;
189 
191  0);
193  0);
194 
195  if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) {
196  crm_warn("Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits");
197  dh_max_bits = 0;
198  }
199  if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
200  return dh_min_bits;
201  }
202  if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
203  return dh_max_bits;
204  }
205  return dh_bits;
206 }
207 
219 gnutls_session_t *
220 pcmk__new_tls_session(int csock, unsigned int conn_type,
221  gnutls_credentials_type_t cred_type, void *credentials)
222 {
223  int rc = GNUTLS_E_SUCCESS;
224  const char *prio_base = NULL;
225  char *prio = NULL;
226  gnutls_session_t *session = NULL;
227 
228  /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
229  * values required for its functionality.
230  *
231  * For an example of anonymous authentication, see:
232  * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
233  */
234 
236  if (prio_base == NULL) {
237  prio_base = PCMK_GNUTLS_PRIORITIES;
238  }
239  prio = crm_strdup_printf("%s:%s", prio_base,
240  (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
241 
242  session = gnutls_malloc(sizeof(gnutls_session_t));
243  if (session == NULL) {
244  rc = GNUTLS_E_MEMORY_ERROR;
245  goto error;
246  }
247 
248  rc = gnutls_init(session, conn_type);
249  if (rc != GNUTLS_E_SUCCESS) {
250  goto error;
251  }
252 
253  /* @TODO On the server side, it would be more efficient to cache the
254  * priority with gnutls_priority_init2() and set it with
255  * gnutls_priority_set() for all sessions.
256  */
257  rc = gnutls_priority_set_direct(*session, prio, NULL);
258  if (rc != GNUTLS_E_SUCCESS) {
259  goto error;
260  }
261  if (conn_type == GNUTLS_CLIENT) {
262  set_minimum_dh_bits(session);
263  }
264 
265  gnutls_transport_set_ptr(*session,
266  (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
267 
268  rc = gnutls_credentials_set(*session, cred_type, credentials);
269  if (rc != GNUTLS_E_SUCCESS) {
270  goto error;
271  }
272  free(prio);
273  return session;
274 
275 error:
276  crm_err("Could not initialize %s TLS %s session: %s "
277  CRM_XS " rc=%d priority='%s'",
278  (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
279  (conn_type == GNUTLS_SERVER)? "server" : "client",
280  gnutls_strerror(rc), rc, prio);
281  free(prio);
282  if (session != NULL) {
283  gnutls_free(session);
284  }
285  return NULL;
286 }
287 
303 int
304 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
305 {
306  int rc = GNUTLS_E_SUCCESS;
307  unsigned int dh_bits = 0;
308 
309  rc = gnutls_dh_params_init(dh_params);
310  if (rc != GNUTLS_E_SUCCESS) {
311  goto error;
312  }
313 
314  dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
315  GNUTLS_SEC_PARAM_NORMAL);
316  if (dh_bits == 0) {
317  rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
318  goto error;
319  }
320  dh_bits = get_bound_dh_bits(dh_bits);
321 
322  crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
323  dh_bits);
324  rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
325  if (rc != GNUTLS_E_SUCCESS) {
326  goto error;
327  }
328 
329  return pcmk_rc_ok;
330 
331 error:
332  crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
333  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
334  return EPROTO;
335 }
336 
348 int
349 pcmk__read_handshake_data(const pcmk__client_t *client)
350 {
351  int rc = 0;
352 
353  CRM_ASSERT(client && client->remote && client->remote->tls_session);
354 
355  do {
356  rc = gnutls_handshake(*client->remote->tls_session);
357  } while (rc == GNUTLS_E_INTERRUPTED);
358 
359  if (rc == GNUTLS_E_AGAIN) {
360  /* No more data is available at the moment. This function should be
361  * invoked again once the client sends more.
362  */
363  return EAGAIN;
364  } else if (rc != GNUTLS_E_SUCCESS) {
365  crm_err("TLS handshake with remote client failed: %s "
366  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
367  return EPROTO;
368  }
369  return pcmk_rc_ok;
370 }
371 
372 // \return Standard Pacemaker return code
373 static int
374 send_tls(gnutls_session_t *session, struct iovec *iov)
375 {
376  const char *unsent = iov->iov_base;
377  size_t unsent_len = iov->iov_len;
378  ssize_t gnutls_rc;
379 
380  if (unsent == NULL) {
381  return EINVAL;
382  }
383 
384  crm_trace("Sending TLS message of %llu bytes",
385  (unsigned long long) unsent_len);
386  while (true) {
387  gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
388 
389  if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
390  crm_trace("Retrying to send %llu bytes remaining",
391  (unsigned long long) unsent_len);
392 
393  } else if (gnutls_rc < 0) {
394  // Caller can log as error if necessary
395  crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
396  gnutls_strerror((int) gnutls_rc),
397  (long long) gnutls_rc);
398  return ECONNABORTED;
399 
400  } else if (gnutls_rc < unsent_len) {
401  crm_trace("Sent %lld of %llu bytes remaining",
402  (long long) gnutls_rc, (unsigned long long) unsent_len);
403  unsent_len -= gnutls_rc;
404  unsent += gnutls_rc;
405  } else {
406  crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
407  break;
408  }
409  }
410  return pcmk_rc_ok;
411 }
412 #endif
413 
414 // \return Standard Pacemaker return code
415 static int
416 send_plaintext(int sock, struct iovec *iov)
417 {
418  const char *unsent = iov->iov_base;
419  size_t unsent_len = iov->iov_len;
420  ssize_t write_rc;
421 
422  if (unsent == NULL) {
423  return EINVAL;
424  }
425 
426  crm_debug("Sending plaintext message of %llu bytes to socket %d",
427  (unsigned long long) unsent_len, sock);
428  while (true) {
429  write_rc = write(sock, unsent, unsent_len);
430  if (write_rc < 0) {
431  int rc = errno;
432 
433  if ((errno == EINTR) || (errno == EAGAIN)) {
434  crm_trace("Retrying to send %llu bytes remaining to socket %d",
435  (unsigned long long) unsent_len, sock);
436  continue;
437  }
438 
439  // Caller can log as error if necessary
440  crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
441  pcmk_rc_str(rc), rc, sock);
442  return rc;
443 
444  } else if (write_rc < unsent_len) {
445  crm_trace("Sent %lld of %llu bytes remaining",
446  (long long) write_rc, (unsigned long long) unsent_len);
447  unsent += write_rc;
448  unsent_len -= write_rc;
449  continue;
450 
451  } else {
452  crm_trace("Sent all %lld bytes remaining: %.100s",
453  (long long) write_rc, (char *) (iov->iov_base));
454  break;
455  }
456  }
457  return pcmk_rc_ok;
458 }
459 
460 // \return Standard Pacemaker return code
461 static int
462 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
463 {
464  int rc = pcmk_rc_ok;
465 
466  for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
467 #ifdef HAVE_GNUTLS_GNUTLS_H
468  if (remote->tls_session) {
469  rc = send_tls(remote->tls_session, &(iov[lpc]));
470  continue;
471  }
472 #endif
473  if (remote->tcp_socket) {
474  rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
475  } else {
476  rc = ESOCKTNOSUPPORT;
477  }
478  }
479  return rc;
480 }
481 
491 int
492 pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
493 {
494  int rc = pcmk_rc_ok;
495  static uint64_t id = 0;
496  char *xml_text = NULL;
497 
498  struct iovec iov[2];
499  struct remote_header_v0 *header;
500 
501  CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
502 
503  xml_text = dump_xml_unformatted(msg);
504  CRM_CHECK(xml_text != NULL, return EINVAL);
505 
506  header = calloc(1, sizeof(struct remote_header_v0));
507  CRM_ASSERT(header != NULL);
508 
509  iov[0].iov_base = header;
510  iov[0].iov_len = sizeof(struct remote_header_v0);
511 
512  iov[1].iov_base = xml_text;
513  iov[1].iov_len = 1 + strlen(xml_text);
514 
515  id++;
516  header->id = id;
517  header->endian = ENDIAN_LOCAL;
518  header->version = REMOTE_MSG_VERSION;
519  header->payload_offset = iov[0].iov_len;
520  header->payload_uncompressed = iov[1].iov_len;
521  header->size_total = iov[0].iov_len + iov[1].iov_len;
522 
523  rc = remote_send_iovs(remote, iov, 2);
524  if (rc != pcmk_rc_ok) {
525  crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
526  pcmk_rc_str(rc), rc);
527  }
528 
529  free(iov[0].iov_base);
530  free(iov[1].iov_base);
531  return rc;
532 }
533 
543 xmlNode *
545 {
546  xmlNode *xml = NULL;
547  struct remote_header_v0 *header = localized_remote_header(remote);
548 
549  if (header == NULL) {
550  return NULL;
551  }
552 
553  /* Support compression on the receiving end now, in case we ever want to add it later */
554  if (header->payload_compressed) {
555  int rc = 0;
556  unsigned int size_u = 1 + header->payload_uncompressed;
557  char *uncompressed = calloc(1, header->payload_offset + size_u);
558 
559  crm_trace("Decompressing message data %d bytes into %d bytes",
560  header->payload_compressed, size_u);
561 
562  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
563  remote->buffer + header->payload_offset,
564  header->payload_compressed, 1, 0);
565  rc = pcmk__bzlib2rc(rc);
566 
567  if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
568  crm_warn("Couldn't decompress v%d message, we only understand v%d",
569  header->version, REMOTE_MSG_VERSION);
570  free(uncompressed);
571  return NULL;
572 
573  } else if (rc != pcmk_rc_ok) {
574  crm_err("Decompression failed: %s " CRM_XS " rc=%d",
575  pcmk_rc_str(rc), rc);
576  free(uncompressed);
577  return NULL;
578  }
579 
580  CRM_ASSERT(size_u == header->payload_uncompressed);
581 
582  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
583  remote->buffer_size = header->payload_offset + size_u;
584 
585  free(remote->buffer);
586  remote->buffer = uncompressed;
587  header = localized_remote_header(remote);
588  }
589 
590  /* take ownership of the buffer */
591  remote->buffer_offset = 0;
592 
593  CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
594 
595  xml = string2xml(remote->buffer + header->payload_offset);
596  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
597  crm_warn("Couldn't parse v%d message, we only understand v%d",
598  header->version, REMOTE_MSG_VERSION);
599 
600  } else if (xml == NULL) {
601  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
602  }
603 
604  return xml;
605 }
606 
607 static int
608 get_remote_socket(const pcmk__remote_t *remote)
609 {
610 #ifdef HAVE_GNUTLS_GNUTLS_H
611  if (remote->tls_session) {
612  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
613 
614  return GPOINTER_TO_INT(sock_ptr);
615  }
616 #endif
617 
618  if (remote->tcp_socket) {
619  return remote->tcp_socket;
620  }
621 
622  crm_err("Remote connection type undetermined (bug?)");
623  return -1;
624 }
625 
637 int
638 pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
639 {
640  struct pollfd fds = { 0, };
641  int sock = 0;
642  int rc = 0;
643  time_t start;
644  int timeout = timeout_ms;
645 
646  sock = get_remote_socket(remote);
647  if (sock <= 0) {
648  crm_trace("No longer connected");
649  return ENOTCONN;
650  }
651 
652  start = time(NULL);
653  errno = 0;
654  do {
655  fds.fd = sock;
656  fds.events = POLLIN;
657 
658  /* If we got an EINTR while polling, and we have a
659  * specific timeout we are trying to honor, attempt
660  * to adjust the timeout to the closest second. */
661  if (errno == EINTR && (timeout > 0)) {
662  timeout = timeout_ms - ((time(NULL) - start) * 1000);
663  if (timeout < 1000) {
664  timeout = 1000;
665  }
666  }
667 
668  rc = poll(&fds, 1, timeout);
669  } while (rc < 0 && errno == EINTR);
670 
671  if (rc < 0) {
672  return errno;
673  }
674  return (rc == 0)? ETIME : pcmk_rc_ok;
675 }
676 
689 static int
690 read_available_remote_data(pcmk__remote_t *remote)
691 {
692  int rc = pcmk_rc_ok;
693  size_t read_len = sizeof(struct remote_header_v0);
694  struct remote_header_v0 *header = localized_remote_header(remote);
695  bool received = false;
696  ssize_t read_rc;
697 
698  if(header) {
699  /* Stop at the end of the current message */
700  read_len = header->size_total;
701  }
702 
703  /* automatically grow the buffer when needed */
704  if(remote->buffer_size < read_len) {
705  remote->buffer_size = 2 * read_len;
706  crm_trace("Expanding buffer to %llu bytes",
707  (unsigned long long) remote->buffer_size);
708  remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
709  }
710 
711 #ifdef HAVE_GNUTLS_GNUTLS_H
712  if (!received && remote->tls_session) {
713  read_rc = gnutls_record_recv(*(remote->tls_session),
714  remote->buffer + remote->buffer_offset,
715  remote->buffer_size - remote->buffer_offset);
716  if (read_rc == GNUTLS_E_INTERRUPTED) {
717  rc = EINTR;
718  } else if (read_rc == GNUTLS_E_AGAIN) {
719  rc = EAGAIN;
720  } else if (read_rc < 0) {
721  crm_debug("TLS receive failed: %s (%lld)",
722  gnutls_strerror(read_rc), (long long) read_rc);
723  rc = EIO;
724  }
725  received = true;
726  }
727 #endif
728 
729  if (!received && remote->tcp_socket) {
730  read_rc = read(remote->tcp_socket,
731  remote->buffer + remote->buffer_offset,
732  remote->buffer_size - remote->buffer_offset);
733  if (read_rc < 0) {
734  rc = errno;
735  }
736  received = true;
737  }
738 
739  if (!received) {
740  crm_err("Remote connection type undetermined (bug?)");
741  return ESOCKTNOSUPPORT;
742  }
743 
744  /* process any errors. */
745  if (read_rc > 0) {
746  remote->buffer_offset += read_rc;
747  /* always null terminate buffer, the +1 to alloc always allows for this. */
748  remote->buffer[remote->buffer_offset] = '\0';
749  crm_trace("Received %lld more bytes (%llu total)",
750  (long long) read_rc,
751  (unsigned long long) remote->buffer_offset);
752 
753  } else if ((rc == EINTR) || (rc == EAGAIN)) {
754  crm_trace("No data available for non-blocking remote read: %s (%d)",
755  pcmk_rc_str(rc), rc);
756 
757  } else if (read_rc == 0) {
758  crm_debug("End of remote data encountered after %llu bytes",
759  (unsigned long long) remote->buffer_offset);
760  return ENOTCONN;
761 
762  } else {
763  crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
764  (unsigned long long) remote->buffer_offset,
765  pcmk_rc_str(rc), rc);
766  return ENOTCONN;
767  }
768 
769  header = localized_remote_header(remote);
770  if(header) {
771  if(remote->buffer_offset < header->size_total) {
772  crm_trace("Read partial remote message (%llu of %u bytes)",
773  (unsigned long long) remote->buffer_offset,
774  header->size_total);
775  } else {
776  crm_trace("Read full remote message of %llu bytes",
777  (unsigned long long) remote->buffer_offset);
778  return pcmk_rc_ok;
779  }
780  }
781 
782  return EAGAIN;
783 }
784 
795 int
797 {
798  int rc = pcmk_rc_ok;
799  time_t start = time(NULL);
800  int remaining_timeout = 0;
801 
802  if (timeout_ms == 0) {
803  timeout_ms = 10000;
804  } else if (timeout_ms < 0) {
805  timeout_ms = 60000;
806  }
807 
808  remaining_timeout = timeout_ms;
809  while (remaining_timeout > 0) {
810 
811  crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
812  remaining_timeout, timeout_ms);
813  rc = pcmk__remote_ready(remote, remaining_timeout);
814 
815  if (rc == ETIME) {
816  crm_err("Timed out (%d ms) while waiting for remote data",
817  remaining_timeout);
818  return rc;
819 
820  } else if (rc != pcmk_rc_ok) {
821  crm_debug("Wait for remote data aborted (will retry): %s "
822  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
823 
824  } else {
825  rc = read_available_remote_data(remote);
826  if (rc == pcmk_rc_ok) {
827  return rc;
828  } else if (rc == EAGAIN) {
829  crm_trace("Waiting for more remote data");
830  } else {
831  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
832  pcmk_rc_str(rc), rc);
833  }
834  }
835 
836  // Don't waste time retrying after fatal errors
837  if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
838  return rc;
839  }
840 
841  remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
842  }
843  return ETIME;
844 }
845 
846 struct tcp_async_cb_data {
847  int sock;
848  int timeout_ms;
849  time_t start;
850  void *userdata;
851  void (*callback) (void *userdata, int rc, int sock);
852 };
853 
854 // \return TRUE if timer should be rescheduled, FALSE otherwise
855 static gboolean
856 check_connect_finished(gpointer userdata)
857 {
858  struct tcp_async_cb_data *cb_data = userdata;
859  int rc;
860 
861  fd_set rset, wset;
862  struct timeval ts = { 0, };
863 
864  if (cb_data->start == 0) {
865  // Last connect() returned success immediately
866  rc = pcmk_rc_ok;
867  goto dispatch_done;
868  }
869 
870  // If the socket is ready for reading or writing, the connect succeeded
871  FD_ZERO(&rset);
872  FD_SET(cb_data->sock, &rset);
873  wset = rset;
874  rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
875 
876  if (rc < 0) { // select() error
877  rc = errno;
878  if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
879  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
880  return TRUE; // There is time left, so reschedule timer
881  } else {
882  rc = ETIMEDOUT;
883  }
884  }
885  crm_trace("Could not check socket %d for connection success: %s (%d)",
886  cb_data->sock, pcmk_rc_str(rc), rc);
887 
888  } else if (rc == 0) { // select() timeout
889  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
890  return TRUE; // There is time left, so reschedule timer
891  }
892  crm_debug("Timed out while waiting for socket %d connection success",
893  cb_data->sock);
894  rc = ETIMEDOUT;
895 
896  // select() returned number of file descriptors that are ready
897 
898  } else if (FD_ISSET(cb_data->sock, &rset)
899  || FD_ISSET(cb_data->sock, &wset)) {
900 
901  // The socket is ready; check it for connection errors
902  int error = 0;
903  socklen_t len = sizeof(error);
904 
905  if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
906  rc = errno;
907  crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
908  cb_data->sock, pcmk_rc_str(rc), rc);
909  } else if (error != 0) {
910  rc = error;
911  crm_trace("Socket %d connected with error: %s (%d)",
912  cb_data->sock, pcmk_rc_str(rc), rc);
913  } else {
914  rc = pcmk_rc_ok;
915  }
916 
917  } else { // Should not be possible
918  crm_trace("select() succeeded, but socket %d not in resulting "
919  "read/write sets", cb_data->sock);
920  rc = EAGAIN;
921  }
922 
923  dispatch_done:
924  if (rc == pcmk_rc_ok) {
925  crm_trace("Socket %d is connected", cb_data->sock);
926  } else {
927  close(cb_data->sock);
928  cb_data->sock = -1;
929  }
930 
931  if (cb_data->callback) {
932  cb_data->callback(cb_data->userdata, rc, cb_data->sock);
933  }
934  free(cb_data);
935  return FALSE; // Do not reschedule timer
936 }
937 
956 static int
957 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
958  int timeout_ms, int *timer_id, void *userdata,
959  void (*callback) (void *userdata, int rc, int sock))
960 {
961  int rc = 0;
962  int interval = 500;
963  int timer;
964  struct tcp_async_cb_data *cb_data = NULL;
965 
966  rc = pcmk__set_nonblocking(sock);
967  if (rc != pcmk_rc_ok) {
968  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
969  pcmk_rc_str(rc), rc);
970  return rc;
971  }
972 
973  rc = connect(sock, addr, addrlen);
974  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
975  rc = errno;
976  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
977  pcmk_rc_str(rc), rc);
978  return rc;
979  }
980 
981  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
982  cb_data->userdata = userdata;
983  cb_data->callback = callback;
984  cb_data->sock = sock;
985  cb_data->timeout_ms = timeout_ms;
986 
987  if (rc == 0) {
988  /* The connect was successful immediately, we still return to mainloop
989  * and let this callback get called later. This avoids the user of this api
990  * to have to account for the fact the callback could be invoked within this
991  * function before returning. */
992  cb_data->start = 0;
993  interval = 1;
994  } else {
995  cb_data->start = time(NULL);
996  }
997 
998  /* This timer function does a non-blocking poll on the socket to see if we
999  * can use it. Once we can, the connect has completed. This method allows us
1000  * to connect without blocking the mainloop.
1001  *
1002  * @TODO Use a mainloop fd callback for this instead of polling. Something
1003  * about the way mainloop is currently polling prevents this from
1004  * working at the moment though. (See connect(2) regarding EINPROGRESS
1005  * for possible new handling needed.)
1006  */
1007  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1008  interval, sock);
1009  timer = g_timeout_add(interval, check_connect_finished, cb_data);
1010  if (timer_id) {
1011  *timer_id = timer;
1012  }
1013 
1014  // timer callback should be taking care of cb_data
1015  // cppcheck-suppress memleak
1016  return pcmk_rc_ok;
1017 }
1018 
1029 static int
1030 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
1031 {
1032  int rc = connect(sock, addr, addrlen);
1033 
1034  if (rc < 0) {
1035  rc = errno;
1036  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1037  pcmk_rc_str(rc), rc);
1038  return rc;
1039  }
1040 
1041  rc = pcmk__set_nonblocking(sock);
1042  if (rc != pcmk_rc_ok) {
1043  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1044  pcmk_rc_str(rc), rc);
1045  return rc;
1046  }
1047 
1048  return pcmk_ok;
1049 }
1050 
1067 int
1068 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
1069  int *sock_fd, void *userdata,
1070  void (*callback) (void *userdata, int rc, int sock))
1071 {
1072  char buffer[INET6_ADDRSTRLEN];
1073  struct addrinfo *res = NULL;
1074  struct addrinfo *rp = NULL;
1075  struct addrinfo hints;
1076  const char *server = host;
1077  int rc;
1078  int sock = -1;
1079 
1080  CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
1081 
1082  // Get host's IP address(es)
1083  memset(&hints, 0, sizeof(struct addrinfo));
1084  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1085  hints.ai_socktype = SOCK_STREAM;
1086  hints.ai_flags = AI_CANONNAME;
1087 
1088  rc = getaddrinfo(server, NULL, &hints, &res);
1089  rc = pcmk__gaierror2rc(rc);
1090 
1091  if (rc != pcmk_rc_ok) {
1092  crm_err("Unable to get IP address info for %s: %s",
1093  server, pcmk_rc_str(rc));
1094  goto async_cleanup;
1095  }
1096 
1097  if (!res || !res->ai_addr) {
1098  crm_err("Unable to get IP address info for %s: no result", server);
1099  rc = ENOTCONN;
1100  goto async_cleanup;
1101  }
1102 
1103  // getaddrinfo() returns a list of host's addresses, try them in order
1104  for (rp = res; rp != NULL; rp = rp->ai_next) {
1105  struct sockaddr *addr = rp->ai_addr;
1106 
1107  if (!addr) {
1108  continue;
1109  }
1110 
1111  if (rp->ai_canonname) {
1112  server = res->ai_canonname;
1113  }
1114  crm_debug("Got canonical name %s for %s", server, host);
1115 
1116  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1117  if (sock == -1) {
1118  rc = errno;
1119  crm_warn("Could not create socket for remote connection to %s:%d: "
1120  "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
1121  continue;
1122  }
1123 
1124  /* Set port appropriately for address family */
1125  /* (void*) casts avoid false-positive compiler alignment warnings */
1126  if (addr->sa_family == AF_INET6) {
1127  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1128  } else {
1129  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1130  }
1131 
1132  memset(buffer, 0, PCMK__NELEM(buffer));
1133  pcmk__sockaddr2str(addr, buffer);
1134  crm_info("Attempting remote connection to %s:%d", buffer, port);
1135 
1136  if (callback) {
1137  if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
1138  timer_id, userdata, callback) == pcmk_rc_ok) {
1139  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1140  }
1141 
1142  } else if (connect_socket_once(sock, rp->ai_addr,
1143  rp->ai_addrlen) == pcmk_rc_ok) {
1144  break; /* Success */
1145  }
1146 
1147  // Connect failed
1148  close(sock);
1149  sock = -1;
1150  rc = ENOTCONN;
1151  }
1152 
1153 async_cleanup:
1154 
1155  if (res) {
1156  freeaddrinfo(res);
1157  }
1158  *sock_fd = sock;
1159  return rc;
1160 }
1161 
1173 void
1174 pcmk__sockaddr2str(const void *sa, char *s)
1175 {
1176  switch (((const struct sockaddr *) sa)->sa_family) {
1177  case AF_INET:
1178  inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
1179  s, INET6_ADDRSTRLEN);
1180  break;
1181 
1182  case AF_INET6:
1183  inet_ntop(AF_INET6,
1184  &(((const struct sockaddr_in6 *) sa)->sin6_addr),
1185  s, INET6_ADDRSTRLEN);
1186  break;
1187 
1188  default:
1189  strcpy(s, "<invalid>");
1190  }
1191 }
1192 
1202 int
1203 pcmk__accept_remote_connection(int ssock, int *csock)
1204 {
1205  int rc;
1206  struct sockaddr_storage addr;
1207  socklen_t laddr = sizeof(addr);
1208  char addr_str[INET6_ADDRSTRLEN];
1209 
1210  /* accept the connection */
1211  memset(&addr, 0, sizeof(addr));
1212  *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1213  if (*csock == -1) {
1214  rc = errno;
1215  crm_err("Could not accept remote client connection: %s "
1216  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
1217  return rc;
1218  }
1219  pcmk__sockaddr2str(&addr, addr_str);
1220  crm_info("Accepted new remote client connection from %s", addr_str);
1221 
1222  rc = pcmk__set_nonblocking(*csock);
1223  if (rc != pcmk_rc_ok) {
1224  crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1225  pcmk_rc_str(rc), rc);
1226  close(*csock);
1227  *csock = -1;
1228  return rc;
1229  }
1230 
1231 #ifdef TCP_USER_TIMEOUT
1232  if (pcmk__get_sbd_timeout() > 0) {
1233  // Time to fail and retry before watchdog
1234  unsigned int optval = (unsigned int) pcmk__get_sbd_timeout() / 2;
1235 
1236  rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1237  &optval, sizeof(optval));
1238  if (rc < 0) {
1239  rc = errno;
1240  crm_err("Could not set TCP timeout to %d ms on remote connection: "
1241  "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
1242  close(*csock);
1243  *csock = -1;
1244  return rc;
1245  }
1246  }
1247 #endif
1248 
1249  return rc;
1250 }
1251 
1257 int
1259 {
1260  static int port = 0;
1261 
1262  if (port == 0) {
1263  const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
1264 
1265  if (env) {
1266  errno = 0;
1267  port = strtol(env, NULL, 10);
1268  if (errno || (port < 1) || (port > 65535)) {
1269  crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
1270  " has invalid value '%s', using %d instead",
1271  env, DEFAULT_REMOTE_PORT);
1272  port = DEFAULT_REMOTE_PORT;
1273  }
1274  } else {
1275  port = DEFAULT_REMOTE_PORT;
1276  }
1277  }
1278  return port;
1279 }
pcmk__cpg_host_t host
Definition: cpg.c:49
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:238
A dumping ground.
int pcmk__set_nonblocking(int fd)
Definition: io.c:524
const char * pcmk_strerror(int rc)
Definition: results.c:149
#define ETIME
Definition: portability.h:111
#define PCMK__ENV_DH_MIN_BITS
uint32_t payload_compressed
Definition: remote.c:218
int pcmk__scan_min_int(const char *text, int *result, int minimum)
Definition: strings.c:127
size_t buffer_offset
Definition: ipc_internal.h:114
#define PCMK__ENV_REMOTE_PORT
uint32_t payload_uncompressed
Definition: remote.c:219
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:222
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
Definition: remote.c:1258
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition: results.c:501
#define ENDIAN_LOCAL
Definition: remote.c:70
const char * pcmk__env_option(const char *option)
Definition: options.c:58
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:212
xmlNode * string2xml(const char *input)
Definition: xml.c:800
void pcmk__sockaddr2str(const void *sa, char *s)
Definition: remote.c:1174
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:52
#define PCMK_GNUTLS_PRIORITIES
Definition: config.h:541
#define PCMK__ENV_TLS_PRIORITIES
#define crm_warn(fmt, args...)
Definition: logging.h:382
#define crm_debug(fmt, args...)
Definition: logging.h:386
char * dump_xml_unformatted(const xmlNode *xml)
Definition: xml.c:1662
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:638
#define crm_trace(fmt, args...)
Definition: logging.h:387
uint64_t id
Definition: remote.c:214
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define __swab64(x)
Definition: remote.c:58
size_t buffer_size
Definition: ipc_internal.h:113
Wrappers for and extensions to libxml2.
#define PCMK__NELEM(a)
Definition: internal.h:46
int pcmk_legacy2rc(int legacy_rc)
Definition: results.c:559
uint32_t payload_offset
Definition: remote.c:217
struct tcp_async_cb_data __attribute__
uint32_t size_total
Definition: remote.c:216
#define CRM_XS
Definition: logging.h:56
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition: results.c:906
#define __swab32(x)
Definition: remote.c:52
#define REMOTE_MSG_VERSION
Definition: remote.c:69
int pcmk__gaierror2rc(int gai)
Map a getaddrinfo() return code to the most similar Pacemaker return code.
Definition: results.c:865
#define crm_err(fmt, args...)
Definition: logging.h:381
#define CRM_ASSERT(expr)
Definition: results.h:42
int pcmk__accept_remote_connection(int ssock, int *csock)
Definition: remote.c:1203
#define pcmk_ok
Definition: results.h:68
#define PCMK__ENV_DH_MAX_BITS
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
Definition: remote.c:1068
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:796
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
Definition: remote.c:544
struct pcmk__remote_s * remote
Definition: ipc_internal.h:192
unsigned int timeout
Definition: pcmk_fence.c:32
#define crm_info(fmt, args...)
Definition: logging.h:384
long pcmk__get_sbd_timeout(void)
Definition: watchdog.c:235
int pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
Definition: remote.c:492
uint32_t version
Definition: remote.c:213
uint64_t flags
Definition: remote.c:215