pacemaker  2.0.2-debe490
Scalable High-Availability cluster resource manager
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2018 Andrew Beekhof <andrew@beekhof.net>
3  *
4  * This source code is licensed under the GNU Lesser General Public License
5  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
6  */
7 
8 #include <crm_internal.h>
9 #include <crm/crm.h>
10 
11 #include <sys/param.h>
12 #include <stdio.h>
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <unistd.h>
16 #include <sys/socket.h>
17 #include <arpa/inet.h>
18 #include <netinet/in.h>
19 #include <netinet/ip.h>
20 #include <netinet/tcp.h>
21 #include <netdb.h>
22 #include <stdlib.h>
23 #include <errno.h>
24 #include <inttypes.h> /* X32T ~ PRIx32 */
25 
26 #include <glib.h>
27 #include <bzlib.h>
28 
29 #include <crm/common/ipcs.h>
30 #include <crm/common/xml.h>
31 #include <crm/common/mainloop.h>
33 
34 #ifdef HAVE_GNUTLS_GNUTLS_H
35 # undef KEYFILE
36 # include <gnutls/gnutls.h>
37 
38 const int psk_tls_kx_order[] = {
39  GNUTLS_KX_DHE_PSK,
40  GNUTLS_KX_PSK,
41 };
42 
43 const int anon_tls_kx_order[] = {
44  GNUTLS_KX_ANON_DH,
45  GNUTLS_KX_DHE_RSA,
46  GNUTLS_KX_DHE_DSS,
47  GNUTLS_KX_RSA,
48  0
49 };
50 #endif
51 
52 /* Swab macros from linux/swab.h */
53 #ifdef HAVE_LINUX_SWAB_H
54 # include <linux/swab.h>
55 #else
56 /*
57  * casts are necessary for constants, because we never know how for sure
58  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
59  */
60 #define __swab16(x) ((uint16_t)( \
61  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
62  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
63 
64 #define __swab32(x) ((uint32_t)( \
65  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
66  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
67  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
68  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
69 
70 #define __swab64(x) ((uint64_t)( \
71  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
72  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
73  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
74  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
75  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
76  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
77  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
78  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
79 #endif
80 
81 #define REMOTE_MSG_VERSION 1
82 #define ENDIAN_LOCAL 0xBADADBBD
83 
84 struct crm_remote_header_v0
85 {
86  uint32_t endian; /* Detect messages from hosts with different endian-ness */
87  uint32_t version;
88  uint64_t id;
89  uint64_t flags;
90  uint32_t size_total;
91  uint32_t payload_offset;
92  uint32_t payload_compressed;
93  uint32_t payload_uncompressed;
94 
95  /* New fields get added here */
96 
97 } __attribute__ ((packed));
98 
99 static struct crm_remote_header_v0 *
100 crm_remote_header(crm_remote_t * remote)
101 {
102  struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
103  if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
104  return NULL;
105 
106  } else if(header->endian != ENDIAN_LOCAL) {
107  uint32_t endian = __swab32(header->endian);
108 
109  CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
110  if(endian != ENDIAN_LOCAL) {
111  crm_err("Invalid message detected, endian mismatch: %" X32T
112  " is neither %" X32T " nor the swab'd %" X32T,
113  ENDIAN_LOCAL, header->endian, endian);
114  return NULL;
115  }
116 
117  header->id = __swab64(header->id);
118  header->flags = __swab64(header->flags);
119  header->endian = __swab32(header->endian);
120 
121  header->version = __swab32(header->version);
122  header->size_total = __swab32(header->size_total);
123  header->payload_offset = __swab32(header->payload_offset);
124  header->payload_compressed = __swab32(header->payload_compressed);
125  header->payload_uncompressed = __swab32(header->payload_uncompressed);
126  }
127 
128  return header;
129 }
130 
131 #ifdef HAVE_GNUTLS_GNUTLS_H
132 
133 int
134 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
135 {
136  int rc = 0;
137  int pollrc = 0;
138  time_t start = time(NULL);
139 
140  do {
141  rc = gnutls_handshake(*remote->tls_session);
142  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
143  pollrc = crm_remote_ready(remote, 1000);
144  if (pollrc < 0) {
145  /* poll returned error, there is no hope */
146  rc = -1;
147  }
148  }
150  } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
151  (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
153  if (rc < 0) {
154  crm_trace("gnutls_handshake() failed with %d", rc);
155  }
156  return rc;
157 }
158 
165 static void
166 pcmk__set_minimum_dh_bits(gnutls_session_t *session)
167 {
168  const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
169 
170  if (dh_min_bits_s) {
171  int dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
172 
173  /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
174  * the priority string imply the DH requirements, but this is the only
175  * way to give the user control over compatibility with older servers.
176  */
177  if (dh_min_bits > 0) {
178  crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
179  dh_min_bits);
180  gnutls_dh_set_prime_bits(*session, dh_min_bits);
181  }
182  }
183 }
184 
185 static unsigned int
186 pcmk__bound_dh_bits(unsigned int dh_bits)
187 {
188  const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
189  const char *dh_max_bits_s = getenv("PCMK_dh_max_bits");
190  int dh_min_bits = 0;
191  int dh_max_bits = 0;
192 
193  if (dh_min_bits_s) {
194  dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
195  }
196  if (dh_max_bits_s) {
197  dh_max_bits = crm_parse_int(dh_max_bits_s, "0");
198  if ((dh_min_bits > 0) && (dh_max_bits > 0)
199  && (dh_max_bits < dh_min_bits)) {
200  crm_warn("Ignoring PCMK_dh_max_bits because it is less than PCMK_dh_min_bits");
201  dh_max_bits = 0;
202  }
203  }
204  if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
205  return dh_min_bits;
206  }
207  if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
208  return dh_max_bits;
209  }
210  return dh_bits;
211 }
212 
225 pcmk__new_tls_session(int csock, unsigned int conn_type,
226  gnutls_credentials_type_t cred_type, void *credentials)
227 {
228  int rc = GNUTLS_E_SUCCESS;
229  const char *prio = NULL;
230  gnutls_session_t *session = NULL;
231 
232  if (cred_type == GNUTLS_CRD_ANON) {
233  // http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
234  prio = PCMK_GNUTLS_PRIORITIES ":+ANON-DH";
235  } else {
236  prio = PCMK_GNUTLS_PRIORITIES ":+DHE-PSK:+PSK";
237  }
238 
239  session = gnutls_malloc(sizeof(gnutls_session_t));
240  if (session == NULL) {
241  rc = GNUTLS_E_MEMORY_ERROR;
242  goto error;
243  }
244 
245  rc = gnutls_init(session, conn_type);
246  if (rc != GNUTLS_E_SUCCESS) {
247  goto error;
248  }
249 
250  /* @TODO On the server side, it would be more efficient to cache the
251  * priority with gnutls_priority_init2() and set it with
252  * gnutls_priority_set() for all sessions.
253  */
254  rc = gnutls_priority_set_direct(*session, prio, NULL);
255  if (rc != GNUTLS_E_SUCCESS) {
256  goto error;
257  }
258  if (conn_type == GNUTLS_CLIENT) {
259  pcmk__set_minimum_dh_bits(session);
260  }
261 
262  gnutls_transport_set_ptr(*session,
263  (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
264 
265  rc = gnutls_credentials_set(*session, cred_type, credentials);
266  if (rc != GNUTLS_E_SUCCESS) {
267  goto error;
268  }
269  return session;
270 
271 error:
272  crm_err("Could not initialize %s TLS %s session: %s "
273  CRM_XS " rc=%d priority='%s'",
274  (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
275  (conn_type == GNUTLS_SERVER)? "server" : "client",
276  gnutls_strerror(rc), rc, prio);
277  if (session != NULL) {
278  gnutls_free(session);
279  }
280  return NULL;
281 }
282 
298 int
299 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
300 {
301  int rc = GNUTLS_E_SUCCESS;
302  unsigned int dh_bits = 0;
303 
304  rc = gnutls_dh_params_init(dh_params);
305  if (rc != GNUTLS_E_SUCCESS) {
306  goto error;
307  }
308 
309 #ifdef HAVE_GNUTLS_SEC_PARAM_TO_PK_BITS
310  dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
311  GNUTLS_SEC_PARAM_NORMAL);
312  if (dh_bits == 0) {
313  rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
314  goto error;
315  }
316 #else
317  dh_bits = 1024;
318 #endif
319  dh_bits = pcmk__bound_dh_bits(dh_bits);
320 
321  crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
322  dh_bits);
323  rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
324  if (rc != GNUTLS_E_SUCCESS) {
325  goto error;
326  }
327 
328  return rc;
329 
330 error:
331  crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
332  CRM_XS " rc=%d", gnutls_strerror(rc), rc);
333  CRM_ASSERT(rc == GNUTLS_E_SUCCESS);
334  return rc;
335 }
336 
349 int
350 pcmk__read_handshake_data(crm_client_t *client)
351 {
352  int rc = 0;
353 
354  CRM_ASSERT(client && client->remote && client->remote->tls_session);
355 
356  do {
357  rc = gnutls_handshake(*client->remote->tls_session);
358  } while (rc == GNUTLS_E_INTERRUPTED);
359 
360  if (rc == GNUTLS_E_AGAIN) {
361  /* No more data is available at the moment. This function should be
362  * invoked again once the client sends more.
363  */
364  return 0;
365  } else if (rc != GNUTLS_E_SUCCESS) {
366  return rc;
367  }
368  return 1;
369 }
370 
371 static int
372 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
373 {
374  const char *unsent = buf;
375  int rc = 0;
376  int total_send;
377 
378  if (buf == NULL) {
379  return -EINVAL;
380  }
381 
382  total_send = len;
383  crm_trace("Message size: %llu", (unsigned long long) len);
384 
385  while (TRUE) {
386  rc = gnutls_record_send(*session, unsent, len);
387 
388  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
389  crm_trace("Retrying to send %llu bytes",
390  (unsigned long long) len);
391 
392  } else if (rc < 0) {
393  // Caller can log as error if necessary
394  crm_info("TLS connection terminated: %s " CRM_XS " rc=%d",
395  gnutls_strerror(rc), rc);
396  rc = -ECONNABORTED;
397  break;
398 
399  } else if (rc < len) {
400  crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
401  len -= rc;
402  unsent += rc;
403  } else {
404  crm_trace("Sent all %d bytes", rc);
405  break;
406  }
407  }
408 
409  return rc < 0 ? rc : total_send;
410 }
411 #endif
412 
413 static int
414 crm_send_plaintext(int sock, const char *buf, size_t len)
415 {
416 
417  int rc = 0;
418  const char *unsent = buf;
419  int total_send;
420 
421  if (buf == NULL) {
422  return -EINVAL;
423  }
424  total_send = len;
425 
426  crm_trace("Message on socket %d: size=%llu",
427  sock, (unsigned long long) len);
428  retry:
429  rc = write(sock, unsent, len);
430  if (rc < 0) {
431  rc = -errno;
432  switch (errno) {
433  case EINTR:
434  case EAGAIN:
435  crm_trace("Retry");
436  goto retry;
437  default:
438  crm_perror(LOG_INFO,
439  "Could only write %d of the remaining %llu bytes",
440  rc, (unsigned long long) len);
441  break;
442  }
443 
444  } else if (rc < len) {
445  crm_trace("Only sent %d of %llu remaining bytes",
446  rc, (unsigned long long) len);
447  len -= rc;
448  unsent += rc;
449  goto retry;
450 
451  } else {
452  crm_trace("Sent %d bytes: %.100s", rc, buf);
453  }
454 
455  return rc < 0 ? rc : total_send;
456 
457 }
458 
459 static int
460 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
461 {
462  int rc = 0;
463 
464  for (int lpc = 0; (lpc < iovs) && (rc >= 0); lpc++) {
465 #ifdef HAVE_GNUTLS_GNUTLS_H
466  if (remote->tls_session) {
467  rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
468  continue;
469  }
470 #endif
471  if (remote->tcp_socket) {
472  rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
473  } else {
474  rc = -ESOCKTNOSUPPORT;
475  }
476  }
477  return rc;
478 }
479 
480 int
481 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
482 {
483  int rc = pcmk_ok;
484  static uint64_t id = 0;
485  char *xml_text = dump_xml_unformatted(msg);
486 
487  struct iovec iov[2];
488  struct crm_remote_header_v0 *header;
489 
490  if (xml_text == NULL) {
491  crm_err("Could not send remote message: no message provided");
492  return -EINVAL;
493  }
494 
495  header = calloc(1, sizeof(struct crm_remote_header_v0));
496  iov[0].iov_base = header;
497  iov[0].iov_len = sizeof(struct crm_remote_header_v0);
498 
499  iov[1].iov_base = xml_text;
500  iov[1].iov_len = 1 + strlen(xml_text);
501 
502  id++;
503  header->id = id;
504  header->endian = ENDIAN_LOCAL;
505  header->version = REMOTE_MSG_VERSION;
506  header->payload_offset = iov[0].iov_len;
507  header->payload_uncompressed = iov[1].iov_len;
508  header->size_total = iov[0].iov_len + iov[1].iov_len;
509 
510  crm_trace("Sending len[0]=%d, start=%x",
511  (int)iov[0].iov_len, *(int*)(void*)xml_text);
512  rc = crm_remote_sendv(remote, iov, 2);
513  if (rc < 0) {
514  crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
515  pcmk_strerror(rc), rc);
516  }
517 
518  free(iov[0].iov_base);
519  free(iov[1].iov_base);
520  return rc;
521 }
522 
523 
529 xmlNode *
531 {
532  xmlNode *xml = NULL;
533  struct crm_remote_header_v0 *header = crm_remote_header(remote);
534 
535  if (remote->buffer == NULL || header == NULL) {
536  return NULL;
537  }
538 
539  /* Support compression on the receiving end now, in case we ever want to add it later */
540  if (header->payload_compressed) {
541  int rc = 0;
542  unsigned int size_u = 1 + header->payload_uncompressed;
543  char *uncompressed = calloc(1, header->payload_offset + size_u);
544 
545  crm_trace("Decompressing message data %d bytes into %d bytes",
546  header->payload_compressed, size_u);
547 
548  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
549  remote->buffer + header->payload_offset,
550  header->payload_compressed, 1, 0);
551 
552  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
553  crm_warn("Couldn't decompress v%d message, we only understand v%d",
554  header->version, REMOTE_MSG_VERSION);
555  free(uncompressed);
556  return NULL;
557 
558  } else if (rc != BZ_OK) {
559  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
560  bz2_strerror(rc), rc);
561  free(uncompressed);
562  return NULL;
563  }
564 
565  CRM_ASSERT(size_u == header->payload_uncompressed);
566 
567  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
568  remote->buffer_size = header->payload_offset + size_u;
569 
570  free(remote->buffer);
571  remote->buffer = uncompressed;
572  header = crm_remote_header(remote);
573  }
574 
575  /* take ownership of the buffer */
576  remote->buffer_offset = 0;
577 
578  CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
579 
580  xml = string2xml(remote->buffer + header->payload_offset);
581  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
582  crm_warn("Couldn't parse v%d message, we only understand v%d",
583  header->version, REMOTE_MSG_VERSION);
584 
585  } else if (xml == NULL) {
586  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
587  }
588 
589  return xml;
590 }
591 
601 int
602 crm_remote_ready(crm_remote_t *remote, int total_timeout)
603 {
604  struct pollfd fds = { 0, };
605  int sock = 0;
606  int rc = 0;
607  time_t start;
608  int timeout = total_timeout;
609 
610 #ifdef HAVE_GNUTLS_GNUTLS_H
611  if (remote->tls_session) {
612  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
613 
614  sock = GPOINTER_TO_INT(sock_ptr);
615  } else if (remote->tcp_socket) {
616 #else
617  if (remote->tcp_socket) {
618 #endif
619  sock = remote->tcp_socket;
620  } else {
621  crm_err("Unsupported connection type");
622  }
623 
624  if (sock <= 0) {
625  crm_trace("No longer connected");
626  return -ENOTCONN;
627  }
628 
629  start = time(NULL);
630  errno = 0;
631  do {
632  fds.fd = sock;
633  fds.events = POLLIN;
634 
635  /* If we got an EINTR while polling, and we have a
636  * specific timeout we are trying to honor, attempt
637  * to adjust the timeout to the closest second. */
638  if (errno == EINTR && (timeout > 0)) {
639  timeout = total_timeout - ((time(NULL) - start) * 1000);
640  if (timeout < 1000) {
641  timeout = 1000;
642  }
643  }
644 
645  rc = poll(&fds, 1, timeout);
646  } while (rc < 0 && errno == EINTR);
647 
648  return (rc < 0)? -errno : rc;
649 }
650 
651 
662 static size_t
663 crm_remote_recv_once(crm_remote_t * remote)
664 {
665  int rc = 0;
666  size_t read_len = sizeof(struct crm_remote_header_v0);
667  struct crm_remote_header_v0 *header = crm_remote_header(remote);
668 
669  if(header) {
670  /* Stop at the end of the current message */
671  read_len = header->size_total;
672  }
673 
674  /* automatically grow the buffer when needed */
675  if(remote->buffer_size < read_len) {
676  remote->buffer_size = 2 * read_len;
677  crm_trace("Expanding buffer to %llu bytes",
678  (unsigned long long) remote->buffer_size);
679 
680  remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
681  CRM_ASSERT(remote->buffer != NULL);
682  }
683 
684 #ifdef HAVE_GNUTLS_GNUTLS_H
685  if (remote->tls_session) {
686  rc = gnutls_record_recv(*(remote->tls_session),
687  remote->buffer + remote->buffer_offset,
688  remote->buffer_size - remote->buffer_offset);
689  if (rc == GNUTLS_E_INTERRUPTED) {
690  rc = -EINTR;
691  } else if (rc == GNUTLS_E_AGAIN) {
692  rc = -EAGAIN;
693  } else if (rc < 0) {
694  crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
695  rc = -pcmk_err_generic;
696  }
697  } else if (remote->tcp_socket) {
698 #else
699  if (remote->tcp_socket) {
700 #endif
701  errno = 0;
702  rc = read(remote->tcp_socket,
703  remote->buffer + remote->buffer_offset,
704  remote->buffer_size - remote->buffer_offset);
705  if(rc < 0) {
706  rc = -errno;
707  }
708 
709  } else {
710  crm_err("Unsupported connection type");
711  return -ESOCKTNOSUPPORT;
712  }
713 
714  /* process any errors. */
715  if (rc > 0) {
716  remote->buffer_offset += rc;
717  /* always null terminate buffer, the +1 to alloc always allows for this. */
718  remote->buffer[remote->buffer_offset] = '\0';
719  crm_trace("Received %u more bytes, %llu total",
720  rc, (unsigned long long) remote->buffer_offset);
721 
722  } else if (rc == -EINTR || rc == -EAGAIN) {
723  crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
724 
725  } else if (rc == 0) {
726  crm_debug("EOF encoutered after %llu bytes",
727  (unsigned long long) remote->buffer_offset);
728  return -ENOTCONN;
729 
730  } else {
731  crm_debug("Error receiving message after %llu bytes: %s (%d)",
732  (unsigned long long) remote->buffer_offset,
733  pcmk_strerror(rc), rc);
734  return -ENOTCONN;
735  }
736 
737  header = crm_remote_header(remote);
738  if(header) {
739  if(remote->buffer_offset < header->size_total) {
740  crm_trace("Read less than the advertised length: %llu < %u bytes",
741  (unsigned long long) remote->buffer_offset,
742  header->size_total);
743  } else {
744  crm_trace("Read full message of %llu bytes",
745  (unsigned long long) remote->buffer_offset);
746  return remote->buffer_offset;
747  }
748  }
749 
750  return -EAGAIN;
751 }
752 
763 gboolean
764 crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
765 {
766  int rc;
767  time_t start = time(NULL);
768  int remaining_timeout = 0;
769 
770  if (total_timeout == 0) {
771  total_timeout = 10000;
772  } else if (total_timeout < 0) {
773  total_timeout = 60000;
774  }
775  *disconnected = 0;
776 
777  remaining_timeout = total_timeout;
778  while ((remaining_timeout > 0) && !(*disconnected)) {
779 
780  crm_trace("Waiting for remote data (%d of %d ms timeout remaining)",
781  remaining_timeout, total_timeout);
782  rc = crm_remote_ready(remote, remaining_timeout);
783 
784  if (rc == 0) {
785  crm_err("Timed out (%d ms) while waiting for remote data",
786  remaining_timeout);
787  return FALSE;
788 
789  } else if (rc < 0) {
790  crm_debug("Wait for remote data aborted, will try again: %s "
791  CRM_XS " rc=%d", pcmk_strerror(rc), rc);
792 
793  } else {
794  rc = crm_remote_recv_once(remote);
795  if (rc > 0) {
796  return TRUE;
797  } else if (rc == -EAGAIN) {
798  crm_trace("Still waiting for remote data");
799  } else if (rc < 0) {
800  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
801  pcmk_strerror(rc), rc);
802  }
803  }
804 
805  if (rc == -ENOTCONN) {
806  *disconnected = 1;
807  return FALSE;
808  }
809 
810  remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
811  }
812 
813  return FALSE;
814 }
815 
816 struct tcp_async_cb_data {
817  gboolean success;
818  int sock;
819  void *userdata;
820  void (*callback) (void *userdata, int sock);
821  int timeout; /*ms */
822  time_t start;
823 };
824 
825 static gboolean
826 check_connect_finished(gpointer userdata)
827 {
828  struct tcp_async_cb_data *cb_data = userdata;
829  int cb_arg = 0; // socket fd on success, -errno on error
830  int sock = cb_data->sock;
831  int error = 0;
832 
833  fd_set rset, wset;
834  socklen_t len = sizeof(error);
835  struct timeval ts = { 0, };
836 
837  if (cb_data->success == TRUE) {
838  goto dispatch_done;
839  }
840 
841  FD_ZERO(&rset);
842  FD_SET(sock, &rset);
843  wset = rset;
844 
845  crm_trace("fd %d: checking to see if connect finished", sock);
846  cb_arg = select(sock + 1, &rset, &wset, NULL, &ts);
847 
848  if (cb_arg < 0) {
849  cb_arg = -errno;
850  if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
851  /* reschedule if there is still time left */
852  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
853  goto reschedule;
854  } else {
855  cb_arg = -ETIMEDOUT;
856  }
857  }
858  crm_trace("fd %d: select failed %d connect dispatch ", sock, cb_arg);
859  goto dispatch_done;
860  } else if (cb_arg == 0) {
861  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
862  goto reschedule;
863  }
864  crm_debug("fd %d: timeout during select", sock);
865  cb_arg = -ETIMEDOUT;
866  goto dispatch_done;
867  } else {
868  crm_trace("fd %d: select returned success", sock);
869  cb_arg = 0;
870  }
871 
872  /* can we read or write to the socket now? */
873  if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
874  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
875  cb_arg = -errno;
876  crm_trace("fd %d: call to getsockopt failed", sock);
877  goto dispatch_done;
878  }
879  if (error) {
880  crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
881  cb_arg = -error;
882  goto dispatch_done;
883  }
884  } else {
885  crm_trace("neither read nor write set after select");
886  cb_arg = -EAGAIN;
887  goto dispatch_done;
888  }
889 
890  dispatch_done:
891  if (!cb_arg) {
892  crm_trace("fd %d: connected", sock);
893  /* Success, set the return code to the sock to report to the callback */
894  cb_arg = cb_data->sock;
895  cb_data->sock = 0;
896  } else {
897  close(sock);
898  }
899 
900  if (cb_data->callback) {
901  cb_data->callback(cb_data->userdata, cb_arg);
902  }
903  free(cb_data);
904  return FALSE;
905 
906  reschedule:
907 
908  /* will check again next interval */
909  return TRUE;
910 }
911 
912 static int
913 internal_tcp_connect_async(int sock,
914  const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
915  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
916 {
917  int rc = 0;
918  int interval = 500;
919  int timer;
920  struct tcp_async_cb_data *cb_data = NULL;
921 
922  rc = crm_set_nonblocking(sock);
923  if (rc < 0) {
924  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
925  pcmk_strerror(rc), rc);
926  close(sock);
927  return -1;
928  }
929 
930  rc = connect(sock, addr, addrlen);
931  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
932  crm_perror(LOG_WARNING, "connect");
933  return -1;
934  }
935 
936  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
937  cb_data->userdata = userdata;
938  cb_data->callback = callback;
939  cb_data->sock = sock;
940  cb_data->timeout = timeout;
941  cb_data->start = time(NULL);
942 
943  if (rc == 0) {
944  /* The connect was successful immediately, we still return to mainloop
945  * and let this callback get called later. This avoids the user of this api
946  * to have to account for the fact the callback could be invoked within this
947  * function before returning. */
948  cb_data->success = TRUE;
949  interval = 1;
950  }
951 
952  /* Check connect finished is mostly doing a non-block poll on the socket
953  * to see if we can read/write to it. Once we can, the connect has completed.
954  * This method allows us to connect to the server without blocking mainloop.
955  *
956  * This is a poor man's way of polling to see when the connection finished.
957  * At some point we should figure out a way to use a mainloop fd callback for this.
958  * Something about the way mainloop is currently polling prevents this from working at the
959  * moment though. */
960  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
961  interval, sock);
962  timer = g_timeout_add(interval, check_connect_finished, cb_data);
963  if (timer_id) {
964  *timer_id = timer;
965  }
966 
967  return 0;
968 }
969 
970 static int
971 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
972 {
973  int rc = connect(sock, addr, addrlen);
974 
975  if (rc < 0) {
976  rc = -errno;
977  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
978  pcmk_strerror(rc), rc);
979  return rc;
980  }
981 
982  rc = crm_set_nonblocking(sock);
983  if (rc < 0) {
984  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
985  pcmk_strerror(rc), rc);
986  return rc;
987  }
988 
989  return pcmk_ok;
990 }
991 
1005 int
1006 crm_remote_tcp_connect_async(const char *host, int port, int timeout,
1007  int *timer_id, void *userdata,
1008  void (*callback) (void *userdata, int sock))
1009 {
1010  char buffer[INET6_ADDRSTRLEN];
1011  struct addrinfo *res = NULL;
1012  struct addrinfo *rp = NULL;
1013  struct addrinfo hints;
1014  const char *server = host;
1015  int ret_ga;
1016  int sock = -ENOTCONN;
1017 
1018  // Get host's IP address(es)
1019  memset(&hints, 0, sizeof(struct addrinfo));
1020  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1021  hints.ai_socktype = SOCK_STREAM;
1022  hints.ai_flags = AI_CANONNAME;
1023  ret_ga = getaddrinfo(server, NULL, &hints, &res);
1024  if (ret_ga) {
1025  crm_err("Unable to get IP address info for %s: %s",
1026  server, gai_strerror(ret_ga));
1027  goto async_cleanup;
1028  }
1029  if (!res || !res->ai_addr) {
1030  crm_err("Unable to get IP address info for %s: no result", server);
1031  goto async_cleanup;
1032  }
1033 
1034  // getaddrinfo() returns a list of host's addresses, try them in order
1035  for (rp = res; rp != NULL; rp = rp->ai_next) {
1036  struct sockaddr *addr = rp->ai_addr;
1037 
1038  if (!addr) {
1039  continue;
1040  }
1041 
1042  if (rp->ai_canonname) {
1043  server = res->ai_canonname;
1044  }
1045  crm_debug("Got canonical name %s for %s", server, host);
1046 
1047  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1048  if (sock == -1) {
1049  crm_perror(LOG_WARNING, "creating socket for connection to %s",
1050  server);
1051  sock = -ENOTCONN;
1052  continue;
1053  }
1054 
1055  /* Set port appropriately for address family */
1056  /* (void*) casts avoid false-positive compiler alignment warnings */
1057  if (addr->sa_family == AF_INET6) {
1058  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1059  } else {
1060  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1061  }
1062 
1063  memset(buffer, 0, DIMOF(buffer));
1064  crm_sockaddr2str(addr, buffer);
1065  crm_info("Attempting TCP connection to %s:%d", buffer, port);
1066 
1067  if (callback) {
1068  if (internal_tcp_connect_async
1069  (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
1070  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1071  }
1072 
1073  } else if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
1074  break; /* Success */
1075  }
1076 
1077  close(sock);
1078  sock = -ENOTCONN;
1079  }
1080 
1081 async_cleanup:
1082 
1083  if (res) {
1084  freeaddrinfo(res);
1085  }
1086  return sock;
1087 }
1088 
1089 int
1090 crm_remote_tcp_connect(const char *host, int port)
1091 {
1092  return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
1093 }
1094 
1105 void
1106 crm_sockaddr2str(void *sa, char *s)
1107 {
1108  switch (((struct sockaddr*)sa)->sa_family) {
1109  case AF_INET:
1110  inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
1111  s, INET6_ADDRSTRLEN);
1112  break;
1113 
1114  case AF_INET6:
1115  inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
1116  s, INET6_ADDRSTRLEN);
1117  break;
1118 
1119  default:
1120  strcpy(s, "<invalid>");
1121  }
1122 }
1123 
1124 int
1126 {
1127  int csock = 0;
1128  int rc = 0;
1129  unsigned laddr = 0;
1130  struct sockaddr_storage addr;
1131  char addr_str[INET6_ADDRSTRLEN];
1132 #ifdef TCP_USER_TIMEOUT
1133  int optval;
1134  long sbd_timeout = crm_get_sbd_timeout();
1135 #endif
1136 
1137  /* accept the connection */
1138  laddr = sizeof(addr);
1139  memset(&addr, 0, sizeof(addr));
1140  csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1141  crm_sockaddr2str(&addr, addr_str);
1142  crm_info("New remote connection from %s", addr_str);
1143 
1144  if (csock == -1) {
1145  crm_err("accept socket failed");
1146  return -1;
1147  }
1148 
1149  rc = crm_set_nonblocking(csock);
1150  if (rc < 0) {
1151  crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1152  pcmk_strerror(rc), rc);
1153  close(csock);
1154  return rc;
1155  }
1156 
1157 #ifdef TCP_USER_TIMEOUT
1158  if (sbd_timeout > 0) {
1159  optval = sbd_timeout / 2; /* time to fail and retry before watchdog */
1160  rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
1161  &optval, sizeof(optval));
1162  if (rc < 0) {
1163  crm_err("setting TCP_USER_TIMEOUT (%d) on client socket failed",
1164  optval);
1165  close(csock);
1166  return rc;
1167  }
1168  }
1169 #endif
1170 
1171  return csock;
1172 }
1173 
1179 int
1181 {
1182  static int port = 0;
1183 
1184  if (port == 0) {
1185  const char *env = getenv("PCMK_remote_port");
1186 
1187  if (env) {
1188  errno = 0;
1189  port = strtol(env, NULL, 10);
1190  if (errno || (port < 1) || (port > 65535)) {
1191  crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1192  env, DEFAULT_REMOTE_PORT);
1193  port = DEFAULT_REMOTE_PORT;
1194  }
1195  } else {
1196  port = DEFAULT_REMOTE_PORT;
1197  }
1198  }
1199  return port;
1200 }
A dumping ground.
size_t buffer_offset
Definition: ipcs.h:41
const char * pcmk_strerror(int rc)
Definition: results.c:188
const char * bz2_strerror(int rc)
Definition: results.c:443
long crm_get_sbd_timeout(void)
Definition: watchdog.c:214
uint32_t payload_compressed
Definition: remote.c:151
int crm_remote_tcp_connect(const char *host, int port)
Definition: remote.c:1090
int crm_remote_accept(int ssock)
Definition: remote.c:1125
char * buffer
Definition: ipcs.h:39
#define pcmk_err_generic
Definition: results.h:60
int crm_parse_int(const char *text, const char *default_text)
Parse an integer value from a string.
Definition: strings.c:110
AIS_Host host
Definition: internal.h:86
struct crm_remote_s * remote
Definition: ipcs.h:91
uint32_t payload_uncompressed
Definition: remote.c:152
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:142
#define ENDIAN_LOCAL
Definition: remote.c:82
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:145
xmlNode * string2xml(const char *input)
Definition: xml.c:2058
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:49
uint32_t id
Definition: internal.h:82
#define PCMK_GNUTLS_PRIORITIES
Definition: config.h:538
#define crm_warn(fmt, args...)
Definition: logging.h:241
#define crm_debug(fmt, args...)
Definition: logging.h:245
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
Definition: remote.c:481
void gnutls_session_t
Definition: cib_remote.c:42
#define crm_trace(fmt, args...)
Definition: logging.h:246
int crm_set_nonblocking(int fd)
Definition: io.c:486
#define __swab64(x)
Definition: remote.c:70
Wrappers for and extensions to libxml2.
int crm_remote_ready(crm_remote_t *remote, int total_timeout)
Definition: remote.c:602
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
Definition: remote.c:1006
uint32_t payload_offset
Definition: remote.c:150
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
Definition: remote.c:764
uint32_t size_total
Definition: remote.c:149
#define CRM_XS
Definition: logging.h:34
void crm_sockaddr2str(void *sa, char *s)
Convert an IP address (IPv4 or IPv6) to a string for logging.
Definition: remote.c:1106
#define X32T
Definition: config.h:636
#define __swab32(x)
Definition: remote.c:64
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:218
size_t buffer_size
Definition: ipcs.h:40
#define REMOTE_MSG_VERSION
Definition: remote.c:81
#define crm_err(fmt, args...)
Definition: logging.h:240
#define CRM_ASSERT(expr)
Definition: results.h:42
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3196
#define DIMOF(a)
Definition: crm.h:35
#define pcmk_ok
Definition: results.h:57
int tcp_socket
Definition: ipcs.h:43
#define crm_info(fmt, args...)
Definition: logging.h:243
uint32_t version
Definition: remote.c:146
uint64_t flags
Definition: remote.c:148
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
Definition: remote.c:1180
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
Definition: remote.c:530
enum crm_proc_flag __attribute__