pacemaker  2.0.5-ba59be712
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2020 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <crm/crm.h>
12 
13 #include <sys/param.h>
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #include <sys/socket.h>
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/ip.h>
22 #include <netinet/tcp.h>
23 #include <netdb.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <inttypes.h> /* X32T ~ PRIx32 */
27 
28 #include <glib.h>
29 #include <bzlib.h>
30 
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
35 
36 #ifdef HAVE_GNUTLS_GNUTLS_H
37 # include <gnutls/gnutls.h>
38 #endif
39 
40 /* Swab macros from linux/swab.h */
41 #ifdef HAVE_LINUX_SWAB_H
42 # include <linux/swab.h>
43 #else
44 /*
45  * casts are necessary for constants, because we never know how for sure
46  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
47  */
48 #define __swab16(x) ((uint16_t)( \
49  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
50  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51 
52 #define __swab32(x) ((uint32_t)( \
53  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
54  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
55  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
56  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57 
58 #define __swab64(x) ((uint64_t)( \
59  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
60  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
61  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
62  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
63  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
64  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
65  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
66  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
67 #endif
68 
69 #define REMOTE_MSG_VERSION 1
70 #define ENDIAN_LOCAL 0xBADADBBD
71 
72 struct remote_header_v0 {
73  uint32_t endian; /* Detect messages from hosts with different endian-ness */
74  uint32_t version;
75  uint64_t id;
76  uint64_t flags;
77  uint32_t size_total;
78  uint32_t payload_offset;
79  uint32_t payload_compressed;
80  uint32_t payload_uncompressed;
81 
82  /* New fields get added here */
83 
84 } __attribute__ ((packed));
85 
97 static struct remote_header_v0 *
98 localized_remote_header(pcmk__remote_t *remote)
99 {
100  struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
101  if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
102  return NULL;
103 
104  } else if(header->endian != ENDIAN_LOCAL) {
105  uint32_t endian = __swab32(header->endian);
106 
108  if(endian != ENDIAN_LOCAL) {
109  crm_err("Invalid message detected, endian mismatch: %" X32T
110  " is neither %" X32T " nor the swab'd %" X32T,
111  ENDIAN_LOCAL, header->endian, endian);
112  return NULL;
113  }
114 
115  header->id = __swab64(header->id);
116  header->flags = __swab64(header->flags);
117  header->endian = __swab32(header->endian);
118 
119  header->version = __swab32(header->version);
120  header->size_total = __swab32(header->size_total);
121  header->payload_offset = __swab32(header->payload_offset);
122  header->payload_compressed = __swab32(header->payload_compressed);
123  header->payload_uncompressed = __swab32(header->payload_uncompressed);
124  }
125 
126  return header;
127 }
128 
129 #ifdef HAVE_GNUTLS_GNUTLS_H
130 
131 int
132 pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_ms)
133 {
134  int rc = 0;
135  int pollrc = 0;
136  time_t time_limit = time(NULL) + timeout_ms / 1000;
137 
138  do {
139  rc = gnutls_handshake(*remote->tls_session);
140  if ((rc == GNUTLS_E_INTERRUPTED) || (rc == GNUTLS_E_AGAIN)) {
141  pollrc = pcmk__remote_ready(remote, 1000);
142  if ((pollrc != pcmk_rc_ok) && (pollrc != ETIME)) {
143  /* poll returned error, there is no hope */
144  crm_trace("TLS handshake poll failed: %s (%d)",
145  pcmk_strerror(pollrc), pollrc);
146  return pcmk_legacy2rc(pollrc);
147  }
148  } else if (rc < 0) {
149  crm_trace("TLS handshake failed: %s (%d)",
150  gnutls_strerror(rc), rc);
151  return EPROTO;
152  } else {
153  return pcmk_rc_ok;
154  }
155  } while (time(NULL) < time_limit);
156  return ETIME;
157 }
158 
165 static void
166 set_minimum_dh_bits(gnutls_session_t *session)
167 {
168  const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
169 
170  if (dh_min_bits_s) {
171  int dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
172 
173  /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
174  * the priority string imply the DH requirements, but this is the only
175  * way to give the user control over compatibility with older servers.
176  */
177  if (dh_min_bits > 0) {
178  crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
179  dh_min_bits);
180  gnutls_dh_set_prime_bits(*session, dh_min_bits);
181  }
182  }
183 }
184 
185 static unsigned int
186 get_bound_dh_bits(unsigned int dh_bits)
187 {
188  const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
189  const char *dh_max_bits_s = getenv("PCMK_dh_max_bits");
190  int dh_min_bits = 0;
191  int dh_max_bits = 0;
192 
193  if (dh_min_bits_s) {
194  dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
195  }
196  if (dh_max_bits_s) {
197  dh_max_bits = crm_parse_int(dh_max_bits_s, "0");
198  if ((dh_min_bits > 0) && (dh_max_bits > 0)
199  && (dh_max_bits < dh_min_bits)) {
200  crm_warn("Ignoring PCMK_dh_max_bits because it is less than PCMK_dh_min_bits");
201  dh_max_bits = 0;
202  }
203  }
204  if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
205  return dh_min_bits;
206  }
207  if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
208  return dh_max_bits;
209  }
210  return dh_bits;
211 }
212 
225 pcmk__new_tls_session(int csock, unsigned int conn_type,
226  gnutls_credentials_type_t cred_type, void *credentials)
227 {
228  int rc = GNUTLS_E_SUCCESS;
229  const char *prio_base = NULL;
230  char *prio = NULL;
231  gnutls_session_t *session = NULL;
232 
233  /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
234  * values required for its functionality.
235  *
236  * For an example of anonymous authentication, see:
237  * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
238  */
239 
240  prio_base = getenv("PCMK_tls_priorities");
241  if (prio_base == NULL) {
242  prio_base = PCMK_GNUTLS_PRIORITIES;
243  }
244  prio = crm_strdup_printf("%s:%s", prio_base,
245  (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
246 
247  session = gnutls_malloc(sizeof(gnutls_session_t));
248  if (session == NULL) {
249  rc = GNUTLS_E_MEMORY_ERROR;
250  goto error;
251  }
252 
253  rc = gnutls_init(session, conn_type);
254  if (rc != GNUTLS_E_SUCCESS) {
255  goto error;
256  }
257 
258  /* @TODO On the server side, it would be more efficient to cache the
259  * priority with gnutls_priority_init2() and set it with
260  * gnutls_priority_set() for all sessions.
261  */
262  rc = gnutls_priority_set_direct(*session, prio, NULL);
263  if (rc != GNUTLS_E_SUCCESS) {
264  goto error;
265  }
266  if (conn_type == GNUTLS_CLIENT) {
267  set_minimum_dh_bits(session);
268  }
269 
270  gnutls_transport_set_ptr(*session,
271  (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
272 
273  rc = gnutls_credentials_set(*session, cred_type, credentials);
274  if (rc != GNUTLS_E_SUCCESS) {
275  goto error;
276  }
277  free(prio);
278  return session;
279 
280 error:
281  crm_err("Could not initialize %s TLS %s session: %s "
282  CRM_XS " rc=%d priority='%s'",
283  (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
284  (conn_type == GNUTLS_SERVER)? "server" : "client",
285  gnutls_strerror(rc), rc, prio);
286  free(prio);
287  if (session != NULL) {
288  gnutls_free(session);
289  }
290  return NULL;
291 }
292 
308 int
309 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
310 {
311  int rc = GNUTLS_E_SUCCESS;
312  unsigned int dh_bits = 0;
313 
314  rc = gnutls_dh_params_init(dh_params);
315  if (rc != GNUTLS_E_SUCCESS) {
316  goto error;
317  }
318 
319 #ifdef HAVE_GNUTLS_SEC_PARAM_TO_PK_BITS
320  dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
321  GNUTLS_SEC_PARAM_NORMAL);
322  if (dh_bits == 0) {
323  rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
324  goto error;
325  }
326 #else
327  dh_bits = 1024;
328 #endif
329  dh_bits = get_bound_dh_bits(dh_bits);
330 
331  crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
332  dh_bits);
333  rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
334  if (rc != GNUTLS_E_SUCCESS) {
335  goto error;
336  }
337 
338  return pcmk_rc_ok;
339 
340 error:
341  crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
342  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
343  return EPROTO;
344 }
345 
357 int
358 pcmk__read_handshake_data(pcmk__client_t *client)
359 {
360  int rc = 0;
361 
362  CRM_ASSERT(client && client->remote && client->remote->tls_session);
363 
364  do {
365  rc = gnutls_handshake(*client->remote->tls_session);
366  } while (rc == GNUTLS_E_INTERRUPTED);
367 
368  if (rc == GNUTLS_E_AGAIN) {
369  /* No more data is available at the moment. This function should be
370  * invoked again once the client sends more.
371  */
372  return EAGAIN;
373  } else if (rc != GNUTLS_E_SUCCESS) {
374  crm_err("TLS handshake with remote client failed: %s "
375  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
376  return EPROTO;
377  }
378  return pcmk_rc_ok;
379 }
380 
381 // \return Standard Pacemaker return code
382 static int
383 send_tls(gnutls_session_t *session, struct iovec *iov)
384 {
385  const char *unsent = iov->iov_base;
386  size_t unsent_len = iov->iov_len;
387  ssize_t gnutls_rc;
388 
389  if (unsent == NULL) {
390  return EINVAL;
391  }
392 
393  crm_trace("Sending TLS message of %llu bytes",
394  (unsigned long long) unsent_len);
395  while (true) {
396  gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
397 
398  if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
399  crm_trace("Retrying to send %llu bytes remaining",
400  (unsigned long long) unsent_len);
401 
402  } else if (gnutls_rc < 0) {
403  // Caller can log as error if necessary
404  crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
405  gnutls_strerror((int) gnutls_rc),
406  (long long) gnutls_rc);
407  return ECONNABORTED;
408 
409  } else if (gnutls_rc < unsent_len) {
410  crm_trace("Sent %lld of %llu bytes remaining",
411  (long long) gnutls_rc, (unsigned long long) unsent_len);
412  unsent_len -= gnutls_rc;
413  unsent += gnutls_rc;
414  } else {
415  crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
416  break;
417  }
418  }
419  return pcmk_rc_ok;
420 }
421 #endif
422 
423 // \return Standard Pacemaker return code
424 static int
425 send_plaintext(int sock, struct iovec *iov)
426 {
427  const char *unsent = iov->iov_base;
428  size_t unsent_len = iov->iov_len;
429  ssize_t write_rc;
430 
431  if (unsent == NULL) {
432  return EINVAL;
433  }
434 
435  crm_debug("Sending plaintext message of %llu bytes to socket %d",
436  (unsigned long long) unsent_len, sock);
437  while (true) {
438  write_rc = write(sock, unsent, unsent_len);
439  if (write_rc < 0) {
440  int rc = errno;
441 
442  if ((errno == EINTR) || (errno == EAGAIN)) {
443  crm_trace("Retrying to send %llu bytes remaining to socket %d",
444  (unsigned long long) unsent_len, sock);
445  continue;
446  }
447 
448  // Caller can log as error if necessary
449  crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
450  pcmk_rc_str(rc), rc, sock);
451  return rc;
452 
453  } else if (write_rc < unsent_len) {
454  crm_trace("Sent %lld of %llu bytes remaining",
455  (long long) write_rc, (unsigned long long) unsent_len);
456  unsent += write_rc;
457  unsent_len -= write_rc;
458  continue;
459 
460  } else {
461  crm_trace("Sent all %lld bytes remaining: %.100s",
462  (long long) write_rc, (char *) (iov->iov_base));
463  break;
464  }
465  }
466  return pcmk_rc_ok;
467 }
468 
469 // \return Standard Pacemaker return code
470 static int
471 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
472 {
473  int rc = pcmk_rc_ok;
474 
475  for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
476 #ifdef HAVE_GNUTLS_GNUTLS_H
477  if (remote->tls_session) {
478  rc = send_tls(remote->tls_session, &(iov[lpc]));
479  continue;
480  }
481 #endif
482  if (remote->tcp_socket) {
483  rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
484  } else {
485  rc = ESOCKTNOSUPPORT;
486  }
487  }
488  return rc;
489 }
490 
500 int
501 pcmk__remote_send_xml(pcmk__remote_t *remote, xmlNode *msg)
502 {
503  int rc = pcmk_rc_ok;
504  static uint64_t id = 0;
505  char *xml_text = NULL;
506 
507  struct iovec iov[2];
508  struct remote_header_v0 *header;
509 
510  CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
511 
512  xml_text = dump_xml_unformatted(msg);
513  CRM_CHECK(xml_text != NULL, return EINVAL);
514 
515  header = calloc(1, sizeof(struct remote_header_v0));
516  CRM_ASSERT(header != NULL);
517 
518  iov[0].iov_base = header;
519  iov[0].iov_len = sizeof(struct remote_header_v0);
520 
521  iov[1].iov_base = xml_text;
522  iov[1].iov_len = 1 + strlen(xml_text);
523 
524  id++;
525  header->id = id;
526  header->endian = ENDIAN_LOCAL;
527  header->version = REMOTE_MSG_VERSION;
528  header->payload_offset = iov[0].iov_len;
529  header->payload_uncompressed = iov[1].iov_len;
530  header->size_total = iov[0].iov_len + iov[1].iov_len;
531 
532  rc = remote_send_iovs(remote, iov, 2);
533  if (rc != pcmk_rc_ok) {
534  crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
535  pcmk_rc_str(rc), rc);
536  }
537 
538  free(iov[0].iov_base);
539  free(iov[1].iov_base);
540  return rc;
541 }
542 
552 xmlNode *
554 {
555  xmlNode *xml = NULL;
556  struct remote_header_v0 *header = localized_remote_header(remote);
557 
558  if (header == NULL) {
559  return NULL;
560  }
561 
562  /* Support compression on the receiving end now, in case we ever want to add it later */
563  if (header->payload_compressed) {
564  int rc = 0;
565  unsigned int size_u = 1 + header->payload_uncompressed;
566  char *uncompressed = calloc(1, header->payload_offset + size_u);
567 
568  crm_trace("Decompressing message data %d bytes into %d bytes",
569  header->payload_compressed, size_u);
570 
571  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
572  remote->buffer + header->payload_offset,
573  header->payload_compressed, 1, 0);
574 
575  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
576  crm_warn("Couldn't decompress v%d message, we only understand v%d",
577  header->version, REMOTE_MSG_VERSION);
578  free(uncompressed);
579  return NULL;
580 
581  } else if (rc != BZ_OK) {
582  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
583  bz2_strerror(rc), rc);
584  free(uncompressed);
585  return NULL;
586  }
587 
588  CRM_ASSERT(size_u == header->payload_uncompressed);
589 
590  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
591  remote->buffer_size = header->payload_offset + size_u;
592 
593  free(remote->buffer);
594  remote->buffer = uncompressed;
595  header = localized_remote_header(remote);
596  }
597 
598  /* take ownership of the buffer */
599  remote->buffer_offset = 0;
600 
601  CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
602 
603  xml = string2xml(remote->buffer + header->payload_offset);
604  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
605  crm_warn("Couldn't parse v%d message, we only understand v%d",
606  header->version, REMOTE_MSG_VERSION);
607 
608  } else if (xml == NULL) {
609  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
610  }
611 
612  return xml;
613 }
614 
615 static int
616 get_remote_socket(pcmk__remote_t *remote)
617 {
618 #ifdef HAVE_GNUTLS_GNUTLS_H
619  if (remote->tls_session) {
620  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
621 
622  return GPOINTER_TO_INT(sock_ptr);
623  }
624 #endif
625 
626  if (remote->tcp_socket) {
627  return remote->tcp_socket;
628  }
629 
630  crm_err("Remote connection type undetermined (bug?)");
631  return -1;
632 }
633 
645 int
646 pcmk__remote_ready(pcmk__remote_t *remote, int timeout_ms)
647 {
648  struct pollfd fds = { 0, };
649  int sock = 0;
650  int rc = 0;
651  time_t start;
652  int timeout = timeout_ms;
653 
654  sock = get_remote_socket(remote);
655  if (sock <= 0) {
656  crm_trace("No longer connected");
657  return ENOTCONN;
658  }
659 
660  start = time(NULL);
661  errno = 0;
662  do {
663  fds.fd = sock;
664  fds.events = POLLIN;
665 
666  /* If we got an EINTR while polling, and we have a
667  * specific timeout we are trying to honor, attempt
668  * to adjust the timeout to the closest second. */
669  if (errno == EINTR && (timeout > 0)) {
670  timeout = timeout_ms - ((time(NULL) - start) * 1000);
671  if (timeout < 1000) {
672  timeout = 1000;
673  }
674  }
675 
676  rc = poll(&fds, 1, timeout);
677  } while (rc < 0 && errno == EINTR);
678 
679  if (rc < 0) {
680  return errno;
681  }
682  return (rc == 0)? ETIME : pcmk_rc_ok;
683 }
684 
697 static int
698 read_available_remote_data(pcmk__remote_t *remote)
699 {
700  int rc = pcmk_rc_ok;
701  size_t read_len = sizeof(struct remote_header_v0);
702  struct remote_header_v0 *header = localized_remote_header(remote);
703  bool received = false;
704  ssize_t read_rc;
705 
706  if(header) {
707  /* Stop at the end of the current message */
708  read_len = header->size_total;
709  }
710 
711  /* automatically grow the buffer when needed */
712  if(remote->buffer_size < read_len) {
713  remote->buffer_size = 2 * read_len;
714  crm_trace("Expanding buffer to %llu bytes",
715  (unsigned long long) remote->buffer_size);
716  remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
717  }
718 
719 #ifdef HAVE_GNUTLS_GNUTLS_H
720  if (!received && remote->tls_session) {
721  read_rc = gnutls_record_recv(*(remote->tls_session),
722  remote->buffer + remote->buffer_offset,
723  remote->buffer_size - remote->buffer_offset);
724  if (read_rc == GNUTLS_E_INTERRUPTED) {
725  rc = EINTR;
726  } else if (read_rc == GNUTLS_E_AGAIN) {
727  rc = EAGAIN;
728  } else if (read_rc < 0) {
729  crm_debug("TLS receive failed: %s (%lld)",
730  gnutls_strerror(read_rc), (long long) read_rc);
731  rc = EIO;
732  }
733  received = true;
734  }
735 #endif
736 
737  if (!received && remote->tcp_socket) {
738  read_rc = read(remote->tcp_socket,
739  remote->buffer + remote->buffer_offset,
740  remote->buffer_size - remote->buffer_offset);
741  if (read_rc < 0) {
742  rc = errno;
743  }
744  received = true;
745  }
746 
747  if (!received) {
748  crm_err("Remote connection type undetermined (bug?)");
749  return ESOCKTNOSUPPORT;
750  }
751 
752  /* process any errors. */
753  if (read_rc > 0) {
754  remote->buffer_offset += read_rc;
755  /* always null terminate buffer, the +1 to alloc always allows for this. */
756  remote->buffer[remote->buffer_offset] = '\0';
757  crm_trace("Received %lld more bytes (%llu total)",
758  (long long) read_rc,
759  (unsigned long long) remote->buffer_offset);
760 
761  } else if ((rc == EINTR) || (rc == EAGAIN)) {
762  crm_trace("No data available for non-blocking remote read: %s (%d)",
763  pcmk_rc_str(rc), rc);
764 
765  } else if (read_rc == 0) {
766  crm_debug("End of remote data encountered after %llu bytes",
767  (unsigned long long) remote->buffer_offset);
768  return ENOTCONN;
769 
770  } else {
771  crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
772  (unsigned long long) remote->buffer_offset,
773  pcmk_rc_str(rc), rc);
774  return ENOTCONN;
775  }
776 
777  header = localized_remote_header(remote);
778  if(header) {
779  if(remote->buffer_offset < header->size_total) {
780  crm_trace("Read partial remote message (%llu of %u bytes)",
781  (unsigned long long) remote->buffer_offset,
782  header->size_total);
783  } else {
784  crm_trace("Read full remote message of %llu bytes",
785  (unsigned long long) remote->buffer_offset);
786  return pcmk_rc_ok;
787  }
788  }
789 
790  return EAGAIN;
791 }
792 
803 int
805 {
806  int rc = pcmk_rc_ok;
807  time_t start = time(NULL);
808  int remaining_timeout = 0;
809 
810  if (timeout_ms == 0) {
811  timeout_ms = 10000;
812  } else if (timeout_ms < 0) {
813  timeout_ms = 60000;
814  }
815 
816  remaining_timeout = timeout_ms;
817  while (remaining_timeout > 0) {
818 
819  crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
820  remaining_timeout, timeout_ms);
821  rc = pcmk__remote_ready(remote, remaining_timeout);
822 
823  if (rc == ETIME) {
824  crm_err("Timed out (%d ms) while waiting for remote data",
825  remaining_timeout);
826  return rc;
827 
828  } else if (rc != pcmk_rc_ok) {
829  crm_debug("Wait for remote data aborted (will retry): %s "
830  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
831 
832  } else {
833  rc = read_available_remote_data(remote);
834  if (rc == pcmk_rc_ok) {
835  return rc;
836  } else if (rc == EAGAIN) {
837  crm_trace("Waiting for more remote data");
838  } else {
839  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
840  pcmk_rc_str(rc), rc);
841  }
842  }
843 
844  // Don't waste time retrying after fatal errors
845  if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
846  return rc;
847  }
848 
849  remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
850  }
851  return ETIME;
852 }
853 
854 struct tcp_async_cb_data {
855  int sock;
856  int timeout_ms;
857  time_t start;
858  void *userdata;
859  void (*callback) (void *userdata, int rc, int sock);
860 };
861 
862 // \return TRUE if timer should be rescheduled, FALSE otherwise
863 static gboolean
864 check_connect_finished(gpointer userdata)
865 {
866  struct tcp_async_cb_data *cb_data = userdata;
867  int rc;
868 
869  fd_set rset, wset;
870  struct timeval ts = { 0, };
871 
872  if (cb_data->start == 0) {
873  // Last connect() returned success immediately
874  rc = pcmk_rc_ok;
875  goto dispatch_done;
876  }
877 
878  // If the socket is ready for reading or writing, the connect succeeded
879  FD_ZERO(&rset);
880  FD_SET(cb_data->sock, &rset);
881  wset = rset;
882  rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
883 
884  if (rc < 0) { // select() error
885  rc = errno;
886  if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
887  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
888  return TRUE; // There is time left, so reschedule timer
889  } else {
890  rc = ETIMEDOUT;
891  }
892  }
893  crm_trace("Could not check socket %d for connection success: %s (%d)",
894  cb_data->sock, pcmk_rc_str(rc), rc);
895 
896  } else if (rc == 0) { // select() timeout
897  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
898  return TRUE; // There is time left, so reschedule timer
899  }
900  crm_debug("Timed out while waiting for socket %d connection success",
901  cb_data->sock);
902  rc = ETIMEDOUT;
903 
904  // select() returned number of file descriptors that are ready
905 
906  } else if (FD_ISSET(cb_data->sock, &rset)
907  || FD_ISSET(cb_data->sock, &wset)) {
908 
909  // The socket is ready; check it for connection errors
910  int error = 0;
911  socklen_t len = sizeof(error);
912 
913  if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
914  rc = errno;
915  crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
916  cb_data->sock, pcmk_rc_str(rc), rc);
917  } else if (error != 0) {
918  rc = error;
919  crm_trace("Socket %d connected with error: %s (%d)",
920  cb_data->sock, pcmk_rc_str(rc), rc);
921  } else {
922  rc = pcmk_rc_ok;
923  }
924 
925  } else { // Should not be possible
926  crm_trace("select() succeeded, but socket %d not in resulting "
927  "read/write sets", cb_data->sock);
928  rc = EAGAIN;
929  }
930 
931  dispatch_done:
932  if (rc == pcmk_rc_ok) {
933  crm_trace("Socket %d is connected", cb_data->sock);
934  } else {
935  close(cb_data->sock);
936  cb_data->sock = -1;
937  }
938 
939  if (cb_data->callback) {
940  cb_data->callback(cb_data->userdata, rc, cb_data->sock);
941  }
942  free(cb_data);
943  return FALSE; // Do not reschedule timer
944 }
945 
964 static int
965 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
966  int timeout_ms, int *timer_id, void *userdata,
967  void (*callback) (void *userdata, int rc, int sock))
968 {
969  int rc = 0;
970  int interval = 500;
971  int timer;
972  struct tcp_async_cb_data *cb_data = NULL;
973 
974  rc = pcmk__set_nonblocking(sock);
975  if (rc != pcmk_rc_ok) {
976  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
977  pcmk_rc_str(rc), rc);
978  return rc;
979  }
980 
981  rc = connect(sock, addr, addrlen);
982  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
983  rc = errno;
984  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
985  pcmk_rc_str(rc), rc);
986  return rc;
987  }
988 
989  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
990  cb_data->userdata = userdata;
991  cb_data->callback = callback;
992  cb_data->sock = sock;
993  cb_data->timeout_ms = timeout_ms;
994 
995  if (rc == 0) {
996  /* The connect was successful immediately, we still return to mainloop
997  * and let this callback get called later. This avoids the user of this api
998  * to have to account for the fact the callback could be invoked within this
999  * function before returning. */
1000  cb_data->start = 0;
1001  interval = 1;
1002  } else {
1003  cb_data->start = time(NULL);
1004  }
1005 
1006  /* This timer function does a non-blocking poll on the socket to see if we
1007  * can use it. Once we can, the connect has completed. This method allows us
1008  * to connect without blocking the mainloop.
1009  *
1010  * @TODO Use a mainloop fd callback for this instead of polling. Something
1011  * about the way mainloop is currently polling prevents this from
1012  * working at the moment though. (See connect(2) regarding EINPROGRESS
1013  * for possible new handling needed.)
1014  */
1015  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1016  interval, sock);
1017  timer = g_timeout_add(interval, check_connect_finished, cb_data);
1018  if (timer_id) {
1019  *timer_id = timer;
1020  }
1021 
1022  // timer callback should be taking care of cb_data
1023  // cppcheck-suppress memleak
1024  return pcmk_rc_ok;
1025 }
1026 
1037 static int
1038 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
1039 {
1040  int rc = connect(sock, addr, addrlen);
1041 
1042  if (rc < 0) {
1043  rc = errno;
1044  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1045  pcmk_rc_str(rc), rc);
1046  return rc;
1047  }
1048 
1049  rc = pcmk__set_nonblocking(sock);
1050  if (rc != pcmk_rc_ok) {
1051  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1052  pcmk_rc_str(rc), rc);
1053  return rc;
1054  }
1055 
1056  return pcmk_ok;
1057 }
1058 
1075 int
1076 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
1077  int *sock_fd, void *userdata,
1078  void (*callback) (void *userdata, int rc, int sock))
1079 {
1080  char buffer[INET6_ADDRSTRLEN];
1081  struct addrinfo *res = NULL;
1082  struct addrinfo *rp = NULL;
1083  struct addrinfo hints;
1084  const char *server = host;
1085  int rc;
1086  int sock = -1;
1087 
1088  CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
1089 
1090  // Get host's IP address(es)
1091  memset(&hints, 0, sizeof(struct addrinfo));
1092  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1093  hints.ai_socktype = SOCK_STREAM;
1094  hints.ai_flags = AI_CANONNAME;
1095  rc = getaddrinfo(server, NULL, &hints, &res);
1096  if (rc != 0) {
1097  crm_err("Unable to get IP address info for %s: %s",
1098  server, gai_strerror(rc));
1099  rc = ENOTCONN;
1100  goto async_cleanup;
1101  }
1102  if (!res || !res->ai_addr) {
1103  crm_err("Unable to get IP address info for %s: no result", server);
1104  rc = ENOTCONN;
1105  goto async_cleanup;
1106  }
1107 
1108  // getaddrinfo() returns a list of host's addresses, try them in order
1109  for (rp = res; rp != NULL; rp = rp->ai_next) {
1110  struct sockaddr *addr = rp->ai_addr;
1111 
1112  if (!addr) {
1113  continue;
1114  }
1115 
1116  if (rp->ai_canonname) {
1117  server = res->ai_canonname;
1118  }
1119  crm_debug("Got canonical name %s for %s", server, host);
1120 
1121  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1122  if (sock == -1) {
1123  rc = errno;
1124  crm_warn("Could not create socket for remote connection to %s:%d: "
1125  "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
1126  continue;
1127  }
1128 
1129  /* Set port appropriately for address family */
1130  /* (void*) casts avoid false-positive compiler alignment warnings */
1131  if (addr->sa_family == AF_INET6) {
1132  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1133  } else {
1134  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1135  }
1136 
1137  memset(buffer, 0, DIMOF(buffer));
1138  pcmk__sockaddr2str(addr, buffer);
1139  crm_info("Attempting remote connection to %s:%d", buffer, port);
1140 
1141  if (callback) {
1142  if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
1143  timer_id, userdata, callback) == pcmk_rc_ok) {
1144  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1145  }
1146 
1147  } else if (connect_socket_once(sock, rp->ai_addr,
1148  rp->ai_addrlen) == pcmk_rc_ok) {
1149  break; /* Success */
1150  }
1151 
1152  // Connect failed
1153  close(sock);
1154  sock = -1;
1155  rc = ENOTCONN;
1156  }
1157 
1158 async_cleanup:
1159 
1160  if (res) {
1161  freeaddrinfo(res);
1162  }
1163  *sock_fd = sock;
1164  return rc;
1165 }
1166 
1178 void
1179 pcmk__sockaddr2str(void *sa, char *s)
1180 {
1181  switch (((struct sockaddr*)sa)->sa_family) {
1182  case AF_INET:
1183  inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
1184  s, INET6_ADDRSTRLEN);
1185  break;
1186 
1187  case AF_INET6:
1188  inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
1189  s, INET6_ADDRSTRLEN);
1190  break;
1191 
1192  default:
1193  strcpy(s, "<invalid>");
1194  }
1195 }
1196 
1206 int
1207 pcmk__accept_remote_connection(int ssock, int *csock)
1208 {
1209  int rc;
1210  struct sockaddr_storage addr;
1211  socklen_t laddr = sizeof(addr);
1212  char addr_str[INET6_ADDRSTRLEN];
1213 
1214  /* accept the connection */
1215  memset(&addr, 0, sizeof(addr));
1216  *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1217  if (*csock == -1) {
1218  rc = errno;
1219  crm_err("Could not accept remote client connection: %s "
1220  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
1221  return rc;
1222  }
1223  pcmk__sockaddr2str(&addr, addr_str);
1224  crm_info("Accepted new remote client connection from %s", addr_str);
1225 
1226  rc = pcmk__set_nonblocking(*csock);
1227  if (rc != pcmk_rc_ok) {
1228  crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1229  pcmk_rc_str(rc), rc);
1230  close(*csock);
1231  *csock = -1;
1232  return rc;
1233  }
1234 
1235 #ifdef TCP_USER_TIMEOUT
1236  if (pcmk__get_sbd_timeout() > 0) {
1237  // Time to fail and retry before watchdog
1238  unsigned int optval = (unsigned int) pcmk__get_sbd_timeout() / 2;
1239 
1240  rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1241  &optval, sizeof(optval));
1242  if (rc < 0) {
1243  rc = errno;
1244  crm_err("Could not set TCP timeout to %d ms on remote connection: "
1245  "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
1246  close(*csock);
1247  *csock = -1;
1248  return rc;
1249  }
1250  }
1251 #endif
1252 
1253  return rc;
1254 }
1255 
1261 int
1263 {
1264  static int port = 0;
1265 
1266  if (port == 0) {
1267  const char *env = getenv("PCMK_remote_port");
1268 
1269  if (env) {
1270  errno = 0;
1271  port = strtol(env, NULL, 10);
1272  if (errno || (port < 1) || (port > 65535)) {
1273  crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1274  env, DEFAULT_REMOTE_PORT);
1275  port = DEFAULT_REMOTE_PORT;
1276  }
1277  } else {
1278  port = DEFAULT_REMOTE_PORT;
1279  }
1280  }
1281  return port;
1282 }
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:215
A dumping ground.
int pcmk__remote_send_xml(pcmk__remote_t *remote, xmlNode *msg)
Definition: remote.c:501
const char * pcmk_strerror(int rc)
Definition: results.c:58
#define ETIME
Definition: portability.h:165
const char * bz2_strerror(int rc)
Definition: results.c:726
uint32_t payload_compressed
Definition: remote.c:152
size_t buffer_offset
Definition: ipc_internal.h:99
void pcmk__sockaddr2str(void *sa, char *s)
Definition: remote.c:1179
int crm_parse_int(const char *text, const char *default_text)
Parse an integer value from a string.
Definition: strings.c:134
AIS_Host host
Definition: internal.h:84
uint32_t payload_uncompressed
Definition: remote.c:153
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:199
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition: results.c:420
#define ENDIAN_LOCAL
Definition: remote.c:70
int crm_default_remote_port()
Get the default remote connection TCP port on this host.
Definition: remote.c:1262
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:146
xmlNode * string2xml(const char *input)
Definition: xml.c:835
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:50
#define PCMK_GNUTLS_PRIORITIES
Definition: config.h:538
#define crm_warn(fmt, args...)
Definition: logging.h:348
int rc
Definition: pcmk_fence.c:35
#define crm_debug(fmt, args...)
Definition: logging.h:352
void gnutls_session_t
Definition: cib_remote.c:44
#define crm_trace(fmt, args...)
Definition: logging.h:353
uint64_t id
Definition: remote.c:148
#define __swab64(x)
Definition: remote.c:58
size_t buffer_size
Definition: ipc_internal.h:98
Wrappers for and extensions to libxml2.
int pcmk_legacy2rc(int legacy_rc)
Definition: results.c:450
uint32_t payload_offset
Definition: remote.c:151
struct tcp_async_cb_data __attribute__
uint32_t size_total
Definition: remote.c:150
#define CRM_XS
Definition: logging.h:54
#define X32T
Definition: config.h:636
#define __swab32(x)
Definition: remote.c:52
#define REMOTE_MSG_VERSION
Definition: remote.c:69
#define crm_err(fmt, args...)
Definition: logging.h:347
#define CRM_ASSERT(expr)
Definition: results.h:42
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:2000
#define DIMOF(a)
Definition: crm.h:57
int pcmk__accept_remote_connection(int ssock, int *csock)
Definition: remote.c:1207
#define pcmk_ok
Definition: results.h:67
int pcmk__remote_ready(pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:646
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
Definition: remote.c:1076
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:804
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
Definition: remote.c:553
struct pcmk__remote_s * remote
Definition: ipc_internal.h:156
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
unsigned int timeout
Definition: pcmk_fence.c:32
#define crm_info(fmt, args...)
Definition: logging.h:350
long pcmk__get_sbd_timeout(void)
Definition: watchdog.c:234
uint32_t version
Definition: remote.c:147
uint64_t flags
Definition: remote.c:149
int pcmk__set_nonblocking(int fd)
Definition: io.c:536