pacemaker  2.1.3-ea053b43a
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2021 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <crm/crm.h>
12 
13 #include <sys/param.h>
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #include <sys/socket.h>
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/ip.h>
22 #include <netinet/tcp.h>
23 #include <netdb.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <inttypes.h> /* X32T ~ PRIx32 */
27 
28 #include <glib.h>
29 #include <bzlib.h>
30 
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
35 
36 #ifdef HAVE_GNUTLS_GNUTLS_H
37 # include <gnutls/gnutls.h>
38 #endif
39 
40 /* Swab macros from linux/swab.h */
41 #ifdef HAVE_LINUX_SWAB_H
42 # include <linux/swab.h>
43 #else
44 /*
45  * casts are necessary for constants, because we never know how for sure
46  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
47  */
48 #define __swab16(x) ((uint16_t)( \
49  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
50  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51 
52 #define __swab32(x) ((uint32_t)( \
53  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
54  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
55  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
56  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57 
58 #define __swab64(x) ((uint64_t)( \
59  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
60  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
61  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
62  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
63  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
64  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
65  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
66  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
67 #endif
68 
69 #define REMOTE_MSG_VERSION 1
70 #define ENDIAN_LOCAL 0xBADADBBD
71 
72 struct remote_header_v0 {
73  uint32_t endian; /* Detect messages from hosts with different endian-ness */
74  uint32_t version;
75  uint64_t id;
76  uint64_t flags;
77  uint32_t size_total;
78  uint32_t payload_offset;
79  uint32_t payload_compressed;
80  uint32_t payload_uncompressed;
81 
82  /* New fields get added here */
83 
84 } __attribute__ ((packed));
85 
97 static struct remote_header_v0 *
98 localized_remote_header(pcmk__remote_t *remote)
99 {
100  struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
101  if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
102  return NULL;
103 
104  } else if(header->endian != ENDIAN_LOCAL) {
105  uint32_t endian = __swab32(header->endian);
106 
108  if(endian != ENDIAN_LOCAL) {
109  crm_err("Invalid message detected, endian mismatch: %" X32T
110  " is neither %" X32T " nor the swab'd %" X32T,
111  ENDIAN_LOCAL, header->endian, endian);
112  return NULL;
113  }
114 
115  header->id = __swab64(header->id);
116  header->flags = __swab64(header->flags);
117  header->endian = __swab32(header->endian);
118 
119  header->version = __swab32(header->version);
120  header->size_total = __swab32(header->size_total);
121  header->payload_offset = __swab32(header->payload_offset);
122  header->payload_compressed = __swab32(header->payload_compressed);
123  header->payload_uncompressed = __swab32(header->payload_uncompressed);
124  }
125 
126  return header;
127 }
128 
129 #ifdef HAVE_GNUTLS_GNUTLS_H
130 
131 int
132 pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_ms)
133 {
134  int rc = 0;
135  int pollrc = 0;
136  time_t time_limit = time(NULL) + timeout_ms / 1000;
137 
138  do {
139  rc = gnutls_handshake(*remote->tls_session);
140  if ((rc == GNUTLS_E_INTERRUPTED) || (rc == GNUTLS_E_AGAIN)) {
141  pollrc = pcmk__remote_ready(remote, 1000);
142  if ((pollrc != pcmk_rc_ok) && (pollrc != ETIME)) {
143  /* poll returned error, there is no hope */
144  crm_trace("TLS handshake poll failed: %s (%d)",
145  pcmk_strerror(pollrc), pollrc);
146  return pcmk_legacy2rc(pollrc);
147  }
148  } else if (rc < 0) {
149  crm_trace("TLS handshake failed: %s (%d)",
150  gnutls_strerror(rc), rc);
151  return EPROTO;
152  } else {
153  return pcmk_rc_ok;
154  }
155  } while (time(NULL) < time_limit);
156  return ETIME;
157 }
158 
165 static void
166 set_minimum_dh_bits(gnutls_session_t *session)
167 {
168  int dh_min_bits;
169 
170  pcmk__scan_min_int(getenv("PCMK_dh_min_bits"), &dh_min_bits, 0);
171 
172  /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
173  * the priority string imply the DH requirements, but this is the only
174  * way to give the user control over compatibility with older servers.
175  */
176  if (dh_min_bits > 0) {
177  crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
178  dh_min_bits);
179  gnutls_dh_set_prime_bits(*session, dh_min_bits);
180  }
181 }
182 
183 static unsigned int
184 get_bound_dh_bits(unsigned int dh_bits)
185 {
186  int dh_min_bits;
187  int dh_max_bits;
188 
189  pcmk__scan_min_int(getenv("PCMK_dh_min_bits"), &dh_min_bits, 0);
190  pcmk__scan_min_int(getenv("PCMK_dh_max_bits"), &dh_max_bits, 0);
191  if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) {
192  crm_warn("Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits");
193  dh_max_bits = 0;
194  }
195  if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
196  return dh_min_bits;
197  }
198  if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
199  return dh_max_bits;
200  }
201  return dh_bits;
202 }
203 
216 pcmk__new_tls_session(int csock, unsigned int conn_type,
217  gnutls_credentials_type_t cred_type, void *credentials)
218 {
219  int rc = GNUTLS_E_SUCCESS;
220  const char *prio_base = NULL;
221  char *prio = NULL;
222  gnutls_session_t *session = NULL;
223 
224  /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
225  * values required for its functionality.
226  *
227  * For an example of anonymous authentication, see:
228  * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
229  */
230 
231  prio_base = getenv("PCMK_tls_priorities");
232  if (prio_base == NULL) {
233  prio_base = PCMK_GNUTLS_PRIORITIES;
234  }
235  prio = crm_strdup_printf("%s:%s", prio_base,
236  (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
237 
238  session = gnutls_malloc(sizeof(gnutls_session_t));
239  if (session == NULL) {
240  rc = GNUTLS_E_MEMORY_ERROR;
241  goto error;
242  }
243 
244  rc = gnutls_init(session, conn_type);
245  if (rc != GNUTLS_E_SUCCESS) {
246  goto error;
247  }
248 
249  /* @TODO On the server side, it would be more efficient to cache the
250  * priority with gnutls_priority_init2() and set it with
251  * gnutls_priority_set() for all sessions.
252  */
253  rc = gnutls_priority_set_direct(*session, prio, NULL);
254  if (rc != GNUTLS_E_SUCCESS) {
255  goto error;
256  }
257  if (conn_type == GNUTLS_CLIENT) {
258  set_minimum_dh_bits(session);
259  }
260 
261  gnutls_transport_set_ptr(*session,
262  (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
263 
264  rc = gnutls_credentials_set(*session, cred_type, credentials);
265  if (rc != GNUTLS_E_SUCCESS) {
266  goto error;
267  }
268  free(prio);
269  return session;
270 
271 error:
272  crm_err("Could not initialize %s TLS %s session: %s "
273  CRM_XS " rc=%d priority='%s'",
274  (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
275  (conn_type == GNUTLS_SERVER)? "server" : "client",
276  gnutls_strerror(rc), rc, prio);
277  free(prio);
278  if (session != NULL) {
279  gnutls_free(session);
280  }
281  return NULL;
282 }
283 
299 int
300 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
301 {
302  int rc = GNUTLS_E_SUCCESS;
303  unsigned int dh_bits = 0;
304 
305  rc = gnutls_dh_params_init(dh_params);
306  if (rc != GNUTLS_E_SUCCESS) {
307  goto error;
308  }
309 
310  dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
311  GNUTLS_SEC_PARAM_NORMAL);
312  if (dh_bits == 0) {
313  rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
314  goto error;
315  }
316  dh_bits = get_bound_dh_bits(dh_bits);
317 
318  crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
319  dh_bits);
320  rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
321  if (rc != GNUTLS_E_SUCCESS) {
322  goto error;
323  }
324 
325  return pcmk_rc_ok;
326 
327 error:
328  crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
329  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
330  return EPROTO;
331 }
332 
344 int
345 pcmk__read_handshake_data(pcmk__client_t *client)
346 {
347  int rc = 0;
348 
349  CRM_ASSERT(client && client->remote && client->remote->tls_session);
350 
351  do {
352  rc = gnutls_handshake(*client->remote->tls_session);
353  } while (rc == GNUTLS_E_INTERRUPTED);
354 
355  if (rc == GNUTLS_E_AGAIN) {
356  /* No more data is available at the moment. This function should be
357  * invoked again once the client sends more.
358  */
359  return EAGAIN;
360  } else if (rc != GNUTLS_E_SUCCESS) {
361  crm_err("TLS handshake with remote client failed: %s "
362  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
363  return EPROTO;
364  }
365  return pcmk_rc_ok;
366 }
367 
368 // \return Standard Pacemaker return code
369 static int
370 send_tls(gnutls_session_t *session, struct iovec *iov)
371 {
372  const char *unsent = iov->iov_base;
373  size_t unsent_len = iov->iov_len;
374  ssize_t gnutls_rc;
375 
376  if (unsent == NULL) {
377  return EINVAL;
378  }
379 
380  crm_trace("Sending TLS message of %llu bytes",
381  (unsigned long long) unsent_len);
382  while (true) {
383  gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
384 
385  if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
386  crm_trace("Retrying to send %llu bytes remaining",
387  (unsigned long long) unsent_len);
388 
389  } else if (gnutls_rc < 0) {
390  // Caller can log as error if necessary
391  crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
392  gnutls_strerror((int) gnutls_rc),
393  (long long) gnutls_rc);
394  return ECONNABORTED;
395 
396  } else if (gnutls_rc < unsent_len) {
397  crm_trace("Sent %lld of %llu bytes remaining",
398  (long long) gnutls_rc, (unsigned long long) unsent_len);
399  unsent_len -= gnutls_rc;
400  unsent += gnutls_rc;
401  } else {
402  crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
403  break;
404  }
405  }
406  return pcmk_rc_ok;
407 }
408 #endif
409 
410 // \return Standard Pacemaker return code
411 static int
412 send_plaintext(int sock, struct iovec *iov)
413 {
414  const char *unsent = iov->iov_base;
415  size_t unsent_len = iov->iov_len;
416  ssize_t write_rc;
417 
418  if (unsent == NULL) {
419  return EINVAL;
420  }
421 
422  crm_debug("Sending plaintext message of %llu bytes to socket %d",
423  (unsigned long long) unsent_len, sock);
424  while (true) {
425  write_rc = write(sock, unsent, unsent_len);
426  if (write_rc < 0) {
427  int rc = errno;
428 
429  if ((errno == EINTR) || (errno == EAGAIN)) {
430  crm_trace("Retrying to send %llu bytes remaining to socket %d",
431  (unsigned long long) unsent_len, sock);
432  continue;
433  }
434 
435  // Caller can log as error if necessary
436  crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
437  pcmk_rc_str(rc), rc, sock);
438  return rc;
439 
440  } else if (write_rc < unsent_len) {
441  crm_trace("Sent %lld of %llu bytes remaining",
442  (long long) write_rc, (unsigned long long) unsent_len);
443  unsent += write_rc;
444  unsent_len -= write_rc;
445  continue;
446 
447  } else {
448  crm_trace("Sent all %lld bytes remaining: %.100s",
449  (long long) write_rc, (char *) (iov->iov_base));
450  break;
451  }
452  }
453  return pcmk_rc_ok;
454 }
455 
456 // \return Standard Pacemaker return code
457 static int
458 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
459 {
460  int rc = pcmk_rc_ok;
461 
462  for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
463 #ifdef HAVE_GNUTLS_GNUTLS_H
464  if (remote->tls_session) {
465  rc = send_tls(remote->tls_session, &(iov[lpc]));
466  continue;
467  }
468 #endif
469  if (remote->tcp_socket) {
470  rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
471  } else {
472  rc = ESOCKTNOSUPPORT;
473  }
474  }
475  return rc;
476 }
477 
487 int
488 pcmk__remote_send_xml(pcmk__remote_t *remote, xmlNode *msg)
489 {
490  int rc = pcmk_rc_ok;
491  static uint64_t id = 0;
492  char *xml_text = NULL;
493 
494  struct iovec iov[2];
495  struct remote_header_v0 *header;
496 
497  CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
498 
499  xml_text = dump_xml_unformatted(msg);
500  CRM_CHECK(xml_text != NULL, return EINVAL);
501 
502  header = calloc(1, sizeof(struct remote_header_v0));
503  CRM_ASSERT(header != NULL);
504 
505  iov[0].iov_base = header;
506  iov[0].iov_len = sizeof(struct remote_header_v0);
507 
508  iov[1].iov_base = xml_text;
509  iov[1].iov_len = 1 + strlen(xml_text);
510 
511  id++;
512  header->id = id;
513  header->endian = ENDIAN_LOCAL;
514  header->version = REMOTE_MSG_VERSION;
515  header->payload_offset = iov[0].iov_len;
516  header->payload_uncompressed = iov[1].iov_len;
517  header->size_total = iov[0].iov_len + iov[1].iov_len;
518 
519  rc = remote_send_iovs(remote, iov, 2);
520  if (rc != pcmk_rc_ok) {
521  crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
522  pcmk_rc_str(rc), rc);
523  }
524 
525  free(iov[0].iov_base);
526  free(iov[1].iov_base);
527  return rc;
528 }
529 
539 xmlNode *
541 {
542  xmlNode *xml = NULL;
543  struct remote_header_v0 *header = localized_remote_header(remote);
544 
545  if (header == NULL) {
546  return NULL;
547  }
548 
549  /* Support compression on the receiving end now, in case we ever want to add it later */
550  if (header->payload_compressed) {
551  int rc = 0;
552  unsigned int size_u = 1 + header->payload_uncompressed;
553  char *uncompressed = calloc(1, header->payload_offset + size_u);
554 
555  crm_trace("Decompressing message data %d bytes into %d bytes",
556  header->payload_compressed, size_u);
557 
558  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
559  remote->buffer + header->payload_offset,
560  header->payload_compressed, 1, 0);
561 
562  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
563  crm_warn("Couldn't decompress v%d message, we only understand v%d",
564  header->version, REMOTE_MSG_VERSION);
565  free(uncompressed);
566  return NULL;
567 
568  } else if (rc != BZ_OK) {
569  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
570  bz2_strerror(rc), rc);
571  free(uncompressed);
572  return NULL;
573  }
574 
575  CRM_ASSERT(size_u == header->payload_uncompressed);
576 
577  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
578  remote->buffer_size = header->payload_offset + size_u;
579 
580  free(remote->buffer);
581  remote->buffer = uncompressed;
582  header = localized_remote_header(remote);
583  }
584 
585  /* take ownership of the buffer */
586  remote->buffer_offset = 0;
587 
588  CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
589 
590  xml = string2xml(remote->buffer + header->payload_offset);
591  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
592  crm_warn("Couldn't parse v%d message, we only understand v%d",
593  header->version, REMOTE_MSG_VERSION);
594 
595  } else if (xml == NULL) {
596  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
597  }
598 
599  return xml;
600 }
601 
602 static int
603 get_remote_socket(pcmk__remote_t *remote)
604 {
605 #ifdef HAVE_GNUTLS_GNUTLS_H
606  if (remote->tls_session) {
607  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
608 
609  return GPOINTER_TO_INT(sock_ptr);
610  }
611 #endif
612 
613  if (remote->tcp_socket) {
614  return remote->tcp_socket;
615  }
616 
617  crm_err("Remote connection type undetermined (bug?)");
618  return -1;
619 }
620 
632 int
633 pcmk__remote_ready(pcmk__remote_t *remote, int timeout_ms)
634 {
635  struct pollfd fds = { 0, };
636  int sock = 0;
637  int rc = 0;
638  time_t start;
639  int timeout = timeout_ms;
640 
641  sock = get_remote_socket(remote);
642  if (sock <= 0) {
643  crm_trace("No longer connected");
644  return ENOTCONN;
645  }
646 
647  start = time(NULL);
648  errno = 0;
649  do {
650  fds.fd = sock;
651  fds.events = POLLIN;
652 
653  /* If we got an EINTR while polling, and we have a
654  * specific timeout we are trying to honor, attempt
655  * to adjust the timeout to the closest second. */
656  if (errno == EINTR && (timeout > 0)) {
657  timeout = timeout_ms - ((time(NULL) - start) * 1000);
658  if (timeout < 1000) {
659  timeout = 1000;
660  }
661  }
662 
663  rc = poll(&fds, 1, timeout);
664  } while (rc < 0 && errno == EINTR);
665 
666  if (rc < 0) {
667  return errno;
668  }
669  return (rc == 0)? ETIME : pcmk_rc_ok;
670 }
671 
684 static int
685 read_available_remote_data(pcmk__remote_t *remote)
686 {
687  int rc = pcmk_rc_ok;
688  size_t read_len = sizeof(struct remote_header_v0);
689  struct remote_header_v0 *header = localized_remote_header(remote);
690  bool received = false;
691  ssize_t read_rc;
692 
693  if(header) {
694  /* Stop at the end of the current message */
695  read_len = header->size_total;
696  }
697 
698  /* automatically grow the buffer when needed */
699  if(remote->buffer_size < read_len) {
700  remote->buffer_size = 2 * read_len;
701  crm_trace("Expanding buffer to %llu bytes",
702  (unsigned long long) remote->buffer_size);
703  remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
704  }
705 
706 #ifdef HAVE_GNUTLS_GNUTLS_H
707  if (!received && remote->tls_session) {
708  read_rc = gnutls_record_recv(*(remote->tls_session),
709  remote->buffer + remote->buffer_offset,
710  remote->buffer_size - remote->buffer_offset);
711  if (read_rc == GNUTLS_E_INTERRUPTED) {
712  rc = EINTR;
713  } else if (read_rc == GNUTLS_E_AGAIN) {
714  rc = EAGAIN;
715  } else if (read_rc < 0) {
716  crm_debug("TLS receive failed: %s (%lld)",
717  gnutls_strerror(read_rc), (long long) read_rc);
718  rc = EIO;
719  }
720  received = true;
721  }
722 #endif
723 
724  if (!received && remote->tcp_socket) {
725  read_rc = read(remote->tcp_socket,
726  remote->buffer + remote->buffer_offset,
727  remote->buffer_size - remote->buffer_offset);
728  if (read_rc < 0) {
729  rc = errno;
730  }
731  received = true;
732  }
733 
734  if (!received) {
735  crm_err("Remote connection type undetermined (bug?)");
736  return ESOCKTNOSUPPORT;
737  }
738 
739  /* process any errors. */
740  if (read_rc > 0) {
741  remote->buffer_offset += read_rc;
742  /* always null terminate buffer, the +1 to alloc always allows for this. */
743  remote->buffer[remote->buffer_offset] = '\0';
744  crm_trace("Received %lld more bytes (%llu total)",
745  (long long) read_rc,
746  (unsigned long long) remote->buffer_offset);
747 
748  } else if ((rc == EINTR) || (rc == EAGAIN)) {
749  crm_trace("No data available for non-blocking remote read: %s (%d)",
750  pcmk_rc_str(rc), rc);
751 
752  } else if (read_rc == 0) {
753  crm_debug("End of remote data encountered after %llu bytes",
754  (unsigned long long) remote->buffer_offset);
755  return ENOTCONN;
756 
757  } else {
758  crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
759  (unsigned long long) remote->buffer_offset,
760  pcmk_rc_str(rc), rc);
761  return ENOTCONN;
762  }
763 
764  header = localized_remote_header(remote);
765  if(header) {
766  if(remote->buffer_offset < header->size_total) {
767  crm_trace("Read partial remote message (%llu of %u bytes)",
768  (unsigned long long) remote->buffer_offset,
769  header->size_total);
770  } else {
771  crm_trace("Read full remote message of %llu bytes",
772  (unsigned long long) remote->buffer_offset);
773  return pcmk_rc_ok;
774  }
775  }
776 
777  return EAGAIN;
778 }
779 
790 int
792 {
793  int rc = pcmk_rc_ok;
794  time_t start = time(NULL);
795  int remaining_timeout = 0;
796 
797  if (timeout_ms == 0) {
798  timeout_ms = 10000;
799  } else if (timeout_ms < 0) {
800  timeout_ms = 60000;
801  }
802 
803  remaining_timeout = timeout_ms;
804  while (remaining_timeout > 0) {
805 
806  crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
807  remaining_timeout, timeout_ms);
808  rc = pcmk__remote_ready(remote, remaining_timeout);
809 
810  if (rc == ETIME) {
811  crm_err("Timed out (%d ms) while waiting for remote data",
812  remaining_timeout);
813  return rc;
814 
815  } else if (rc != pcmk_rc_ok) {
816  crm_debug("Wait for remote data aborted (will retry): %s "
817  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
818 
819  } else {
820  rc = read_available_remote_data(remote);
821  if (rc == pcmk_rc_ok) {
822  return rc;
823  } else if (rc == EAGAIN) {
824  crm_trace("Waiting for more remote data");
825  } else {
826  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
827  pcmk_rc_str(rc), rc);
828  }
829  }
830 
831  // Don't waste time retrying after fatal errors
832  if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
833  return rc;
834  }
835 
836  remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
837  }
838  return ETIME;
839 }
840 
841 struct tcp_async_cb_data {
842  int sock;
843  int timeout_ms;
844  time_t start;
845  void *userdata;
846  void (*callback) (void *userdata, int rc, int sock);
847 };
848 
849 // \return TRUE if timer should be rescheduled, FALSE otherwise
850 static gboolean
851 check_connect_finished(gpointer userdata)
852 {
853  struct tcp_async_cb_data *cb_data = userdata;
854  int rc;
855 
856  fd_set rset, wset;
857  struct timeval ts = { 0, };
858 
859  if (cb_data->start == 0) {
860  // Last connect() returned success immediately
861  rc = pcmk_rc_ok;
862  goto dispatch_done;
863  }
864 
865  // If the socket is ready for reading or writing, the connect succeeded
866  FD_ZERO(&rset);
867  FD_SET(cb_data->sock, &rset);
868  wset = rset;
869  rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
870 
871  if (rc < 0) { // select() error
872  rc = errno;
873  if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
874  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
875  return TRUE; // There is time left, so reschedule timer
876  } else {
877  rc = ETIMEDOUT;
878  }
879  }
880  crm_trace("Could not check socket %d for connection success: %s (%d)",
881  cb_data->sock, pcmk_rc_str(rc), rc);
882 
883  } else if (rc == 0) { // select() timeout
884  if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
885  return TRUE; // There is time left, so reschedule timer
886  }
887  crm_debug("Timed out while waiting for socket %d connection success",
888  cb_data->sock);
889  rc = ETIMEDOUT;
890 
891  // select() returned number of file descriptors that are ready
892 
893  } else if (FD_ISSET(cb_data->sock, &rset)
894  || FD_ISSET(cb_data->sock, &wset)) {
895 
896  // The socket is ready; check it for connection errors
897  int error = 0;
898  socklen_t len = sizeof(error);
899 
900  if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
901  rc = errno;
902  crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
903  cb_data->sock, pcmk_rc_str(rc), rc);
904  } else if (error != 0) {
905  rc = error;
906  crm_trace("Socket %d connected with error: %s (%d)",
907  cb_data->sock, pcmk_rc_str(rc), rc);
908  } else {
909  rc = pcmk_rc_ok;
910  }
911 
912  } else { // Should not be possible
913  crm_trace("select() succeeded, but socket %d not in resulting "
914  "read/write sets", cb_data->sock);
915  rc = EAGAIN;
916  }
917 
918  dispatch_done:
919  if (rc == pcmk_rc_ok) {
920  crm_trace("Socket %d is connected", cb_data->sock);
921  } else {
922  close(cb_data->sock);
923  cb_data->sock = -1;
924  }
925 
926  if (cb_data->callback) {
927  cb_data->callback(cb_data->userdata, rc, cb_data->sock);
928  }
929  free(cb_data);
930  return FALSE; // Do not reschedule timer
931 }
932 
951 static int
952 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
953  int timeout_ms, int *timer_id, void *userdata,
954  void (*callback) (void *userdata, int rc, int sock))
955 {
956  int rc = 0;
957  int interval = 500;
958  int timer;
959  struct tcp_async_cb_data *cb_data = NULL;
960 
961  rc = pcmk__set_nonblocking(sock);
962  if (rc != pcmk_rc_ok) {
963  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
964  pcmk_rc_str(rc), rc);
965  return rc;
966  }
967 
968  rc = connect(sock, addr, addrlen);
969  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
970  rc = errno;
971  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
972  pcmk_rc_str(rc), rc);
973  return rc;
974  }
975 
976  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
977  cb_data->userdata = userdata;
978  cb_data->callback = callback;
979  cb_data->sock = sock;
980  cb_data->timeout_ms = timeout_ms;
981 
982  if (rc == 0) {
983  /* The connect was successful immediately, we still return to mainloop
984  * and let this callback get called later. This avoids the user of this api
985  * to have to account for the fact the callback could be invoked within this
986  * function before returning. */
987  cb_data->start = 0;
988  interval = 1;
989  } else {
990  cb_data->start = time(NULL);
991  }
992 
993  /* This timer function does a non-blocking poll on the socket to see if we
994  * can use it. Once we can, the connect has completed. This method allows us
995  * to connect without blocking the mainloop.
996  *
997  * @TODO Use a mainloop fd callback for this instead of polling. Something
998  * about the way mainloop is currently polling prevents this from
999  * working at the moment though. (See connect(2) regarding EINPROGRESS
1000  * for possible new handling needed.)
1001  */
1002  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1003  interval, sock);
1004  timer = g_timeout_add(interval, check_connect_finished, cb_data);
1005  if (timer_id) {
1006  *timer_id = timer;
1007  }
1008 
1009  // timer callback should be taking care of cb_data
1010  // cppcheck-suppress memleak
1011  return pcmk_rc_ok;
1012 }
1013 
1024 static int
1025 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
1026 {
1027  int rc = connect(sock, addr, addrlen);
1028 
1029  if (rc < 0) {
1030  rc = errno;
1031  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1032  pcmk_rc_str(rc), rc);
1033  return rc;
1034  }
1035 
1036  rc = pcmk__set_nonblocking(sock);
1037  if (rc != pcmk_rc_ok) {
1038  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1039  pcmk_rc_str(rc), rc);
1040  return rc;
1041  }
1042 
1043  return pcmk_ok;
1044 }
1045 
1062 int
1063 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
1064  int *sock_fd, void *userdata,
1065  void (*callback) (void *userdata, int rc, int sock))
1066 {
1067  char buffer[INET6_ADDRSTRLEN];
1068  struct addrinfo *res = NULL;
1069  struct addrinfo *rp = NULL;
1070  struct addrinfo hints;
1071  const char *server = host;
1072  int rc;
1073  int sock = -1;
1074 
1075  CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
1076 
1077  // Get host's IP address(es)
1078  memset(&hints, 0, sizeof(struct addrinfo));
1079  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1080  hints.ai_socktype = SOCK_STREAM;
1081  hints.ai_flags = AI_CANONNAME;
1082  rc = getaddrinfo(server, NULL, &hints, &res);
1083  if (rc != 0) {
1084  crm_err("Unable to get IP address info for %s: %s",
1085  server, gai_strerror(rc));
1086  rc = ENOTCONN;
1087  goto async_cleanup;
1088  }
1089  if (!res || !res->ai_addr) {
1090  crm_err("Unable to get IP address info for %s: no result", server);
1091  rc = ENOTCONN;
1092  goto async_cleanup;
1093  }
1094 
1095  // getaddrinfo() returns a list of host's addresses, try them in order
1096  for (rp = res; rp != NULL; rp = rp->ai_next) {
1097  struct sockaddr *addr = rp->ai_addr;
1098 
1099  if (!addr) {
1100  continue;
1101  }
1102 
1103  if (rp->ai_canonname) {
1104  server = res->ai_canonname;
1105  }
1106  crm_debug("Got canonical name %s for %s", server, host);
1107 
1108  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1109  if (sock == -1) {
1110  rc = errno;
1111  crm_warn("Could not create socket for remote connection to %s:%d: "
1112  "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
1113  continue;
1114  }
1115 
1116  /* Set port appropriately for address family */
1117  /* (void*) casts avoid false-positive compiler alignment warnings */
1118  if (addr->sa_family == AF_INET6) {
1119  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1120  } else {
1121  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1122  }
1123 
1124  memset(buffer, 0, PCMK__NELEM(buffer));
1125  pcmk__sockaddr2str(addr, buffer);
1126  crm_info("Attempting remote connection to %s:%d", buffer, port);
1127 
1128  if (callback) {
1129  if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
1130  timer_id, userdata, callback) == pcmk_rc_ok) {
1131  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1132  }
1133 
1134  } else if (connect_socket_once(sock, rp->ai_addr,
1135  rp->ai_addrlen) == pcmk_rc_ok) {
1136  break; /* Success */
1137  }
1138 
1139  // Connect failed
1140  close(sock);
1141  sock = -1;
1142  rc = ENOTCONN;
1143  }
1144 
1145 async_cleanup:
1146 
1147  if (res) {
1148  freeaddrinfo(res);
1149  }
1150  *sock_fd = sock;
1151  return rc;
1152 }
1153 
1165 void
1166 pcmk__sockaddr2str(void *sa, char *s)
1167 {
1168  switch (((struct sockaddr*)sa)->sa_family) {
1169  case AF_INET:
1170  inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
1171  s, INET6_ADDRSTRLEN);
1172  break;
1173 
1174  case AF_INET6:
1175  inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
1176  s, INET6_ADDRSTRLEN);
1177  break;
1178 
1179  default:
1180  strcpy(s, "<invalid>");
1181  }
1182 }
1183 
1193 int
1194 pcmk__accept_remote_connection(int ssock, int *csock)
1195 {
1196  int rc;
1197  struct sockaddr_storage addr;
1198  socklen_t laddr = sizeof(addr);
1199  char addr_str[INET6_ADDRSTRLEN];
1200 
1201  /* accept the connection */
1202  memset(&addr, 0, sizeof(addr));
1203  *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1204  if (*csock == -1) {
1205  rc = errno;
1206  crm_err("Could not accept remote client connection: %s "
1207  CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
1208  return rc;
1209  }
1210  pcmk__sockaddr2str(&addr, addr_str);
1211  crm_info("Accepted new remote client connection from %s", addr_str);
1212 
1213  rc = pcmk__set_nonblocking(*csock);
1214  if (rc != pcmk_rc_ok) {
1215  crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1216  pcmk_rc_str(rc), rc);
1217  close(*csock);
1218  *csock = -1;
1219  return rc;
1220  }
1221 
1222 #ifdef TCP_USER_TIMEOUT
1223  if (pcmk__get_sbd_timeout() > 0) {
1224  // Time to fail and retry before watchdog
1225  unsigned int optval = (unsigned int) pcmk__get_sbd_timeout() / 2;
1226 
1227  rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1228  &optval, sizeof(optval));
1229  if (rc < 0) {
1230  rc = errno;
1231  crm_err("Could not set TCP timeout to %d ms on remote connection: "
1232  "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
1233  close(*csock);
1234  *csock = -1;
1235  return rc;
1236  }
1237  }
1238 #endif
1239 
1240  return rc;
1241 }
1242 
1248 int
1250 {
1251  static int port = 0;
1252 
1253  if (port == 0) {
1254  const char *env = getenv("PCMK_remote_port");
1255 
1256  if (env) {
1257  errno = 0;
1258  port = strtol(env, NULL, 10);
1259  if (errno || (port < 1) || (port > 65535)) {
1260  crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1261  env, DEFAULT_REMOTE_PORT);
1262  port = DEFAULT_REMOTE_PORT;
1263  }
1264  } else {
1265  port = DEFAULT_REMOTE_PORT;
1266  }
1267  }
1268  return port;
1269 }
pcmk__cpg_host_t host
Definition: cpg.c:49
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:226
A dumping ground.
int pcmk__remote_send_xml(pcmk__remote_t *remote, xmlNode *msg)
Definition: remote.c:488
const char * pcmk_strerror(int rc)
Definition: results.c:58
#define ETIME
Definition: portability.h:150
const char * bz2_strerror(int rc)
Definition: results.c:742
uint32_t payload_compressed
Definition: remote.c:152
int pcmk__scan_min_int(const char *text, int *result, int minimum)
Definition: strings.c:127
size_t buffer_offset
Definition: ipc_internal.h:102
void pcmk__sockaddr2str(void *sa, char *s)
Definition: remote.c:1166
uint32_t payload_uncompressed
Definition: remote.c:153
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:210
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition: results.c:370
#define ENDIAN_LOCAL
Definition: remote.c:70
int crm_default_remote_port()
Get the default remote connection TCP port on this host.
Definition: remote.c:1249
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:146
xmlNode * string2xml(const char *input)
Definition: xml.c:869
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:51
#define PCMK_GNUTLS_PRIORITIES
Definition: config.h:528
#define crm_warn(fmt, args...)
Definition: logging.h:359
#define crm_debug(fmt, args...)
Definition: logging.h:363
void gnutls_session_t
Definition: cib_remote.c:42
#define crm_trace(fmt, args...)
Definition: logging.h:364
uint64_t id
Definition: remote.c:148
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define __swab64(x)
Definition: remote.c:58
size_t buffer_size
Definition: ipc_internal.h:101
Wrappers for and extensions to libxml2.
#define PCMK__NELEM(a)
Definition: internal.h:42
int pcmk_legacy2rc(int legacy_rc)
Definition: results.c:428
uint32_t payload_offset
Definition: remote.c:151
struct tcp_async_cb_data __attribute__
uint32_t size_total
Definition: remote.c:150
#define CRM_XS
Definition: logging.h:55
#define X32T
Definition: config.h:647
#define __swab32(x)
Definition: remote.c:52
#define REMOTE_MSG_VERSION
Definition: remote.c:69
#define crm_err(fmt, args...)
Definition: logging.h:358
#define CRM_ASSERT(expr)
Definition: results.h:42
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:2018
int pcmk__accept_remote_connection(int ssock, int *csock)
Definition: remote.c:1194
#define pcmk_ok
Definition: results.h:68
int pcmk__remote_ready(pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:633
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
Definition: remote.c:1063
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:791
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
Definition: remote.c:540
struct pcmk__remote_s * remote
Definition: ipc_internal.h:156
unsigned int timeout
Definition: pcmk_fence.c:31
#define crm_info(fmt, args...)
Definition: logging.h:361
long pcmk__get_sbd_timeout(void)
Definition: watchdog.c:240
uint32_t version
Definition: remote.c:147
uint64_t flags
Definition: remote.c:149
int pcmk__set_nonblocking(int fd)
Definition: io.c:517