root/lib/common/remote.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. localized_remote_header
  2. send_tls
  3. send_plaintext
  4. remote_send_iovs
  5. pcmk__remote_send_xml
  6. pcmk__remote_message_xml
  7. get_remote_socket
  8. pcmk__remote_ready
  9. pcmk__read_available_remote_data
  10. pcmk__read_remote_message
  11. check_connect_finished
  12. connect_socket_retry
  13. connect_socket_once
  14. pcmk__connect_remote
  15. pcmk__sockaddr2str
  16. pcmk__accept_remote_connection
  17. crm_default_remote_port

   1 /*
   2  * Copyright 2008-2025 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 #include <crm/crm.h>
  12 
  13 #include <sys/param.h>
  14 #include <stdio.h>
  15 #include <sys/types.h>
  16 #include <sys/stat.h>
  17 #include <unistd.h>
  18 #include <sys/socket.h>
  19 #include <arpa/inet.h>
  20 #include <netinet/in.h>
  21 #include <netinet/ip.h>
  22 #include <netinet/tcp.h>
  23 #include <netdb.h>
  24 #include <stdlib.h>
  25 #include <errno.h>
  26 #include <inttypes.h>   // PRIx32
  27 
  28 #include <glib.h>
  29 #include <bzlib.h>
  30 
  31 #include <crm/common/ipc_internal.h>
  32 #include <crm/common/xml.h>
  33 #include <crm/common/mainloop.h>
  34 #include <crm/common/remote_internal.h>
  35 #include <crm/common/tls_internal.h>
  36 
  37 #include <gnutls/gnutls.h>
  38 
  39 /* Swab macros from linux/swab.h */
  40 #ifdef HAVE_LINUX_SWAB_H
  41 #  include <linux/swab.h>
  42 #else
  43 /*
  44  * casts are necessary for constants, because we never know how for sure
  45  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
  46  */
  47 #define __swab16(x) ((uint16_t)(                                      \
  48         (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) |                  \
  49         (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
  50 
  51 #define __swab32(x) ((uint32_t)(                                      \
  52         (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) |            \
  53         (((uint32_t)(x) & (uint32_t)0x0000ff00UL) <<  8) |            \
  54         (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >>  8) |            \
  55         (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
  56 
  57 #define __swab64(x) ((uint64_t)(                                      \
  58         (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) |   \
  59         (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) |   \
  60         (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) |   \
  61         (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) <<  8) |   \
  62         (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >>  8) |   \
  63         (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) |   \
  64         (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) |   \
  65         (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
  66 #endif
  67 
  68 #define REMOTE_MSG_VERSION 1
  69 #define ENDIAN_LOCAL 0xBADADBBD
  70 
  71 struct remote_header_v0 {
  72     uint32_t endian;    /* Detect messages from hosts with different endian-ness */
  73     uint32_t version;
  74     uint64_t id;
  75     uint64_t flags;
  76     uint32_t size_total;
  77     uint32_t payload_offset;
  78     uint32_t payload_compressed;
  79     uint32_t payload_uncompressed;
  80 
  81         /* New fields get added here */
  82 
  83 } __attribute__ ((packed));
  84 
  85 /*!
  86  * \internal
  87  * \brief Retrieve remote message header, in local endianness
  88  *
  89  * Return a pointer to the header portion of a remote connection's message
  90  * buffer, converting the header to local endianness if needed.
  91  *
  92  * \param[in,out] remote  Remote connection with new message
  93  *
  94  * \return Pointer to message header, localized if necessary
  95  */
  96 static struct remote_header_v0 *
  97 localized_remote_header(pcmk__remote_t *remote)
     /* [previous][next][first][last][top][bottom][index][help] */
  98 {
  99     struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
 100     if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
 101         return NULL;
 102 
 103     } else if(header->endian != ENDIAN_LOCAL) {
 104         uint32_t endian = __swab32(header->endian);
 105 
 106         CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
 107         if(endian != ENDIAN_LOCAL) {
 108             crm_err("Invalid message detected, endian mismatch: %" PRIx32
 109                     " is neither %" PRIx32 " nor the swab'd %" PRIx32,
 110                     ENDIAN_LOCAL, header->endian, endian);
 111             return NULL;
 112         }
 113 
 114         header->id = __swab64(header->id);
 115         header->flags = __swab64(header->flags);
 116         header->endian = __swab32(header->endian);
 117 
 118         header->version = __swab32(header->version);
 119         header->size_total = __swab32(header->size_total);
 120         header->payload_offset = __swab32(header->payload_offset);
 121         header->payload_compressed = __swab32(header->payload_compressed);
 122         header->payload_uncompressed = __swab32(header->payload_uncompressed);
 123     }
 124 
 125     return header;
 126 }
 127 
 128 // \return Standard Pacemaker return code
 129 static int
 130 send_tls(gnutls_session_t session, struct iovec *iov)
     /* [previous][next][first][last][top][bottom][index][help] */
 131 {
 132     const char *unsent = iov->iov_base;
 133     size_t unsent_len = iov->iov_len;
 134     ssize_t gnutls_rc;
 135 
 136     if (unsent == NULL) {
 137         return EINVAL;
 138     }
 139 
 140     crm_trace("Sending TLS message of %zu bytes", unsent_len);
 141 
 142     while (true) {
 143         gnutls_rc = gnutls_record_send(session, unsent, unsent_len);
 144 
 145         if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
 146             crm_trace("Retrying to send %zu bytes remaining", unsent_len);
 147 
 148         } else if (gnutls_rc < 0) {
 149             // Caller can log as error if necessary
 150             crm_info("TLS connection terminated: %s " QB_XS " rc=%zd",
 151                      gnutls_strerror((int) gnutls_rc), gnutls_rc);
 152             return ECONNABORTED;
 153 
 154         } else if (gnutls_rc < unsent_len) {
 155             crm_trace("Sent %zd of %zu bytes remaining", gnutls_rc, unsent_len);
 156             unsent_len -= gnutls_rc;
 157             unsent += gnutls_rc;
 158         } else {
 159             crm_trace("Sent all %zd bytes remaining", gnutls_rc);
 160             break;
 161         }
 162     }
 163     return pcmk_rc_ok;
 164 }
 165 
 166 // \return Standard Pacemaker return code
 167 static int
 168 send_plaintext(int sock, struct iovec *iov)
     /* [previous][next][first][last][top][bottom][index][help] */
 169 {
 170     const char *unsent = iov->iov_base;
 171     size_t unsent_len = iov->iov_len;
 172 
 173     if (unsent == NULL) {
 174         return EINVAL;
 175     }
 176 
 177     crm_debug("Sending plaintext message of %zu bytes to socket %d",
 178               unsent_len, sock);
 179     while (true) {
 180         ssize_t write_rc = write(sock, unsent, unsent_len);
 181 
 182         if (write_rc < 0) {
 183             int rc = errno;
 184 
 185             if ((rc == EINTR) || (rc == EAGAIN) || (rc == EWOULDBLOCK)) {
 186                 crm_trace("Retrying to send %zu bytes remaining to socket %d",
 187                           unsent_len, sock);
 188                 continue;
 189             }
 190 
 191             // Caller can log as error if necessary
 192             crm_info("Could not send message: %s " QB_XS " rc=%d socket=%d",
 193                      pcmk_rc_str(rc), rc, sock);
 194             return rc;
 195 
 196         } else if (write_rc < unsent_len) {
 197             crm_trace("Sent %zd of %zu bytes remaining", write_rc, unsent_len);
 198             unsent += write_rc;
 199             unsent_len -= write_rc;
 200 
 201         } else {
 202             crm_trace("Sent all %zd bytes remaining: %.100s",
 203                       write_rc, (char *) (iov->iov_base));
 204             return pcmk_rc_ok;
 205         }
 206     }
 207 }
 208 
 209 // \return Standard Pacemaker return code
 210 static int
 211 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
     /* [previous][next][first][last][top][bottom][index][help] */
 212 {
 213     int rc = pcmk_rc_ok;
 214 
 215     for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
 216         if (remote->tls_session) {
 217             rc = send_tls(remote->tls_session, &(iov[lpc]));
 218             continue;
 219         }
 220         if (remote->tcp_socket >= 0) {
 221             rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
 222         } else {
 223             rc = ESOCKTNOSUPPORT;
 224         }
 225     }
 226     return rc;
 227 }
 228 
 229 /*!
 230  * \internal
 231  * \brief Send an XML message over a Pacemaker Remote connection
 232  *
 233  * \param[in,out] remote  Pacemaker Remote connection to use
 234  * \param[in]     msg     XML to send
 235  *
 236  * \return Standard Pacemaker return code
 237  */
 238 int
 239 pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
     /* [previous][next][first][last][top][bottom][index][help] */
 240 {
 241     int rc = pcmk_rc_ok;
 242     static uint64_t id = 0;
 243     GString *xml_text = NULL;
 244 
 245     struct iovec iov[2];
 246     struct remote_header_v0 *header;
 247 
 248     CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
 249 
 250     xml_text = g_string_sized_new(1024);
 251     pcmk__xml_string(msg, 0, xml_text, 0);
 252     CRM_CHECK(xml_text->len > 0,
 253               g_string_free(xml_text, TRUE); return EINVAL);
 254 
 255     header = pcmk__assert_alloc(1, sizeof(struct remote_header_v0));
 256 
 257     iov[0].iov_base = header;
 258     iov[0].iov_len = sizeof(struct remote_header_v0);
 259 
 260     iov[1].iov_len = 1 + xml_text->len;
 261     iov[1].iov_base = g_string_free(xml_text, FALSE);
 262 
 263     id++;
 264     header->id = id;
 265     header->endian = ENDIAN_LOCAL;
 266     header->version = REMOTE_MSG_VERSION;
 267     header->payload_offset = iov[0].iov_len;
 268     header->payload_uncompressed = iov[1].iov_len;
 269     header->size_total = iov[0].iov_len + iov[1].iov_len;
 270 
 271     rc = remote_send_iovs(remote, iov, 2);
 272     if (rc != pcmk_rc_ok) {
 273         crm_err("Could not send remote message: %s " QB_XS " rc=%d",
 274                 pcmk_rc_str(rc), rc);
 275     }
 276 
 277     free(iov[0].iov_base);
 278     g_free((gchar *) iov[1].iov_base);
 279     return rc;
 280 }
 281 
 282 /*!
 283  * \internal
 284  * \brief Obtain the XML from the currently buffered remote connection message
 285  *
 286  * \param[in,out] remote  Remote connection possibly with message available
 287  *
 288  * \return Newly allocated XML object corresponding to message data, or NULL
 289  * \note This effectively removes the message from the connection buffer.
 290  */
 291 xmlNode *
 292 pcmk__remote_message_xml(pcmk__remote_t *remote)
     /* [previous][next][first][last][top][bottom][index][help] */
 293 {
 294     xmlNode *xml = NULL;
 295     struct remote_header_v0 *header = localized_remote_header(remote);
 296 
 297     if (header == NULL) {
 298         return NULL;
 299     }
 300 
 301     /* Support compression on the receiving end now, in case we ever want to add it later */
 302     if (header->payload_compressed) {
 303         int rc = 0;
 304         unsigned int size_u = 1 + header->payload_uncompressed;
 305         char *uncompressed =
 306             pcmk__assert_alloc(1, header->payload_offset + size_u);
 307 
 308         crm_trace("Decompressing message data %d bytes into %d bytes",
 309                  header->payload_compressed, size_u);
 310 
 311         rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
 312                                         remote->buffer + header->payload_offset,
 313                                         header->payload_compressed, 1, 0);
 314         rc = pcmk__bzlib2rc(rc);
 315 
 316         if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
 317             crm_warn("Couldn't decompress v%d message, we only understand v%d",
 318                      header->version, REMOTE_MSG_VERSION);
 319             free(uncompressed);
 320             return NULL;
 321 
 322         } else if (rc != pcmk_rc_ok) {
 323             crm_err("Decompression failed: %s " QB_XS " rc=%d",
 324                     pcmk_rc_str(rc), rc);
 325             free(uncompressed);
 326             return NULL;
 327         }
 328 
 329         pcmk__assert(size_u == header->payload_uncompressed);
 330 
 331         memcpy(uncompressed, remote->buffer, header->payload_offset);       /* Preserve the header */
 332         remote->buffer_size = header->payload_offset + size_u;
 333 
 334         free(remote->buffer);
 335         remote->buffer = uncompressed;
 336         header = localized_remote_header(remote);
 337     }
 338 
 339     /* take ownership of the buffer */
 340     remote->buffer_offset = 0;
 341 
 342     CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
 343 
 344     xml = pcmk__xml_parse(remote->buffer + header->payload_offset);
 345     if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
 346         crm_warn("Couldn't parse v%d message, we only understand v%d",
 347                  header->version, REMOTE_MSG_VERSION);
 348 
 349     } else if (xml == NULL) {
 350         crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
 351     }
 352 
 353     crm_log_xml_trace(xml, "[remote msg]");
 354     return xml;
 355 }
 356 
 357 static int
 358 get_remote_socket(const pcmk__remote_t *remote)
     /* [previous][next][first][last][top][bottom][index][help] */
 359 {
 360     if (remote->tls_session != NULL) {
 361         return pcmk__tls_get_client_sock(remote);
 362     }
 363     if (remote->tcp_socket >= 0) {
 364         return remote->tcp_socket;
 365     }
 366     crm_err("Remote connection type undetermined (bug?)");
 367     return -1;
 368 }
 369 
 370 /*!
 371  * \internal
 372  * \brief Wait for a remote session to have data to read
 373  *
 374  * \param[in] remote      Connection to check
 375  * \param[in] timeout_ms  Maximum time (in ms) to wait
 376  *
 377  * \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if
 378  *         there is data ready to be read, and ETIME if there is no data within
 379  *         the specified timeout)
 380  */
 381 int
 382 pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
     /* [previous][next][first][last][top][bottom][index][help] */
 383 {
 384     struct pollfd fds = { 0, };
 385     int sock = -1;
 386     int rc = 0;
 387     time_t start;
 388     int timeout = timeout_ms;
 389 
 390     sock = get_remote_socket(remote);
 391     if (sock < 0) {
 392         crm_trace("No longer connected");
 393         return ENOTCONN;
 394     }
 395 
 396     start = time(NULL);
 397     errno = 0;
 398     do {
 399         fds.fd = sock;
 400         fds.events = POLLIN;
 401 
 402         /* If we got an EINTR while polling, and we have a
 403          * specific timeout we are trying to honor, attempt
 404          * to adjust the timeout to the closest second. */
 405         if (errno == EINTR && (timeout > 0)) {
 406             timeout = timeout_ms - ((time(NULL) - start) * 1000);
 407             if (timeout < 1000) {
 408                 timeout = 1000;
 409             }
 410         }
 411 
 412         rc = poll(&fds, 1, timeout);
 413     } while (rc < 0 && errno == EINTR);
 414 
 415     if (rc < 0) {
 416         return errno;
 417     }
 418     return (rc == 0)? ETIME : pcmk_rc_ok;
 419 }
 420 
 421 /*!
 422  * \internal
 423  * \brief Read bytes from non-blocking remote connection
 424  *
 425  * \param[in,out] remote  Remote connection to read
 426  *
 427  * \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if
 428  *         a full message has been received, or EAGAIN for a partial message)
 429  * \note Use only with non-blocking sockets after polling the socket.
 430  * \note This function will return when the socket read buffer is empty or an
 431  *       error is encountered.
 432  */
 433 int
 434 pcmk__read_available_remote_data(pcmk__remote_t *remote)
     /* [previous][next][first][last][top][bottom][index][help] */
 435 {
 436     int rc = pcmk_rc_ok;
 437     size_t read_len = sizeof(struct remote_header_v0);
 438     struct remote_header_v0 *header = localized_remote_header(remote);
 439     ssize_t read_rc;
 440 
 441     if(header) {
 442         /* Stop at the end of the current message */
 443         read_len = header->size_total;
 444     }
 445 
 446     /* automatically grow the buffer when needed */
 447     if(remote->buffer_size < read_len) {
 448         remote->buffer_size = 2 * read_len;
 449         crm_trace("Expanding buffer to %zu bytes", remote->buffer_size);
 450         remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
 451     }
 452 
 453     if (remote->tls_session) {
 454         read_rc = gnutls_record_recv(remote->tls_session,
 455                                      remote->buffer + remote->buffer_offset,
 456                                      remote->buffer_size - remote->buffer_offset);
 457         if (read_rc == GNUTLS_E_INTERRUPTED) {
 458             rc = EINTR;
 459         } else if (read_rc == GNUTLS_E_AGAIN) {
 460             rc = EAGAIN;
 461         } else if (read_rc < 0) {
 462             crm_debug("TLS receive failed: %s (%zd)",
 463                       gnutls_strerror((int) read_rc), read_rc);
 464             rc = EIO;
 465         }
 466     } else if (remote->tcp_socket >= 0) {
 467         read_rc = read(remote->tcp_socket,
 468                        remote->buffer + remote->buffer_offset,
 469                        remote->buffer_size - remote->buffer_offset);
 470         if (read_rc < 0) {
 471             rc = errno;
 472         }
 473     } else {
 474         crm_err("Remote connection type undetermined (bug?)");
 475         return ESOCKTNOSUPPORT;
 476     }
 477 
 478     /* process any errors. */
 479     if (read_rc > 0) {
 480         remote->buffer_offset += read_rc;
 481         /* always null terminate buffer, the +1 to alloc always allows for this. */
 482         remote->buffer[remote->buffer_offset] = '\0';
 483         crm_trace("Received %zd more bytes (%zu total)",
 484                   read_rc, remote->buffer_offset);
 485 
 486     } else if (read_rc == 0) {
 487         crm_debug("End of remote data encountered after %zu bytes",
 488                   remote->buffer_offset);
 489         return ENOTCONN;
 490 
 491     } else if ((rc == EINTR) || (rc == EAGAIN) || (rc == EWOULDBLOCK)) {
 492         crm_trace("No data available for non-blocking remote read: %s (%d)",
 493                   pcmk_rc_str(rc), rc);
 494 
 495     } else {
 496         crm_debug("Error receiving remote data after %zu bytes: %s (%d)",
 497                   remote->buffer_offset, pcmk_rc_str(rc), rc);
 498         return ENOTCONN;
 499     }
 500 
 501     header = localized_remote_header(remote);
 502     if(header) {
 503         if(remote->buffer_offset < header->size_total) {
 504             crm_trace("Read partial remote message (%zu of %" PRIu32 " bytes)",
 505                       remote->buffer_offset, header->size_total);
 506         } else {
 507             crm_trace("Read full remote message of %zu bytes",
 508                       remote->buffer_offset);
 509             return pcmk_rc_ok;
 510         }
 511     }
 512 
 513     return EAGAIN;
 514 }
 515 
 516 /*!
 517  * \internal
 518  * \brief Read one message from a remote connection
 519  *
 520  * \param[in,out] remote      Remote connection to read
 521  * \param[in]     timeout_ms  Fail if message not read in this many milliseconds
 522  *                            (10s will be used if 0, and 60s if negative)
 523  *
 524  * \return Standard Pacemaker return code
 525  */
 526 int
 527 pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
     /* [previous][next][first][last][top][bottom][index][help] */
 528 {
 529     int rc = pcmk_rc_ok;
 530     time_t start = time(NULL);
 531     int remaining_timeout = 0;
 532 
 533     if (timeout_ms == 0) {
 534         timeout_ms = 10000;
 535     } else if (timeout_ms < 0) {
 536         timeout_ms = 60000;
 537     }
 538 
 539     remaining_timeout = timeout_ms;
 540     while (remaining_timeout > 0) {
 541 
 542         crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
 543                   remaining_timeout, timeout_ms);
 544         rc = pcmk__remote_ready(remote, remaining_timeout);
 545 
 546         if (rc == ETIME) {
 547             crm_err("Timed out (%d ms) while waiting for remote data",
 548                     remaining_timeout);
 549             return rc;
 550 
 551         } else if (rc != pcmk_rc_ok) {
 552             crm_debug("Wait for remote data aborted (will retry): %s "
 553                       QB_XS " rc=%d", pcmk_rc_str(rc), rc);
 554 
 555         } else {
 556             rc = pcmk__read_available_remote_data(remote);
 557             if (rc == pcmk_rc_ok) {
 558                 return rc;
 559             } else if (rc == EAGAIN) {
 560                 crm_trace("Waiting for more remote data");
 561             } else {
 562                 crm_debug("Could not receive remote data: %s " QB_XS " rc=%d",
 563                           pcmk_rc_str(rc), rc);
 564             }
 565         }
 566 
 567         // Don't waste time retrying after fatal errors
 568         if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
 569             return rc;
 570         }
 571 
 572         remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
 573     }
 574     return ETIME;
 575 }
 576 
 577 struct tcp_async_cb_data {
 578     int sock;
 579     int timeout_ms;
 580     time_t start;
 581     void *userdata;
 582     void (*callback) (void *userdata, int rc, int sock);
 583 };
 584 
 585 // \return TRUE if timer should be rescheduled, FALSE otherwise
 586 static gboolean
 587 check_connect_finished(gpointer userdata)
     /* [previous][next][first][last][top][bottom][index][help] */
 588 {
 589     struct tcp_async_cb_data *cb_data = userdata;
 590     int rc;
 591 
 592     fd_set rset, wset;
 593     struct timeval ts = { 0, };
 594 
 595     if (cb_data->start == 0) {
 596         // Last connect() returned success immediately
 597         rc = pcmk_rc_ok;
 598         goto dispatch_done;
 599     }
 600 
 601     // If the socket is ready for reading or writing, the connect succeeded
 602     FD_ZERO(&rset);
 603     FD_SET(cb_data->sock, &rset);
 604     wset = rset;
 605     rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
 606 
 607     if (rc < 0) { // select() error
 608         rc = errno;
 609         if ((rc == EINTR) || (rc == EAGAIN)) {
 610             if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
 611                 return TRUE; // There is time left, so reschedule timer
 612             } else {
 613                 rc = ETIMEDOUT;
 614             }
 615         }
 616         crm_trace("Could not check socket %d for connection success: %s (%d)",
 617                   cb_data->sock, pcmk_rc_str(rc), rc);
 618 
 619     } else if (rc == 0) { // select() timeout
 620         if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
 621             return TRUE; // There is time left, so reschedule timer
 622         }
 623         crm_debug("Timed out while waiting for socket %d connection success",
 624                   cb_data->sock);
 625         rc = ETIMEDOUT;
 626 
 627     // select() returned number of file descriptors that are ready
 628 
 629     } else if (FD_ISSET(cb_data->sock, &rset)
 630                || FD_ISSET(cb_data->sock, &wset)) {
 631 
 632         // The socket is ready; check it for connection errors
 633         int error = 0;
 634         socklen_t len = sizeof(error);
 635 
 636         if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
 637             rc = errno;
 638             crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
 639                       cb_data->sock, pcmk_rc_str(rc), rc);
 640         } else if (error != 0) {
 641             rc = error;
 642             crm_trace("Socket %d connected with error: %s (%d)",
 643                       cb_data->sock, pcmk_rc_str(rc), rc);
 644         } else {
 645             rc = pcmk_rc_ok;
 646         }
 647 
 648     } else { // Should not be possible
 649         crm_trace("select() succeeded, but socket %d not in resulting "
 650                   "read/write sets", cb_data->sock);
 651         rc = EAGAIN;
 652     }
 653 
 654   dispatch_done:
 655     if (rc == pcmk_rc_ok) {
 656         crm_trace("Socket %d is connected", cb_data->sock);
 657     } else {
 658         close(cb_data->sock);
 659         cb_data->sock = -1;
 660     }
 661 
 662     if (cb_data->callback) {
 663         cb_data->callback(cb_data->userdata, rc, cb_data->sock);
 664     }
 665     free(cb_data);
 666     return FALSE; // Do not reschedule timer
 667 }
 668 
 669 /*!
 670  * \internal
 671  * \brief Attempt to connect socket, calling callback when done
 672  *
 673  * Set a given socket non-blocking, then attempt to connect to it,
 674  * retrying periodically until success or a timeout is reached.
 675  * Call a caller-supplied callback function when completed.
 676  *
 677  * \param[in]  sock        Newly created socket
 678  * \param[in]  addr        Socket address information for connect
 679  * \param[in]  addrlen     Size of socket address information in bytes
 680  * \param[in]  timeout_ms  Fail if not connected within this much time
 681  * \param[out] timer_id    If not NULL, store retry timer ID here
 682  * \param[in]  userdata    User data to pass to callback
 683  * \param[in]  callback    Function to call when connection attempt completes
 684  *
 685  * \return Standard Pacemaker return code
 686  */
 687 static int
 688 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
     /* [previous][next][first][last][top][bottom][index][help] */
 689                      int timeout_ms, int *timer_id, void *userdata,
 690                      void (*callback) (void *userdata, int rc, int sock))
 691 {
 692     int rc = 0;
 693     int interval = 500;
 694     int timer;
 695     struct tcp_async_cb_data *cb_data = NULL;
 696 
 697     rc = pcmk__set_nonblocking(sock);
 698     if (rc != pcmk_rc_ok) {
 699         crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
 700                  pcmk_rc_str(rc), rc);
 701         return rc;
 702     }
 703 
 704     rc = connect(sock, addr, addrlen);
 705     if (rc < 0) {
 706         rc = errno;
 707         switch (rc) {
 708             case EINTR:
 709             case EINPROGRESS:
 710             case EAGAIN:
 711                 break;
 712 
 713             default:
 714                 crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
 715                          pcmk_rc_str(rc), rc);
 716                 return rc;
 717         }
 718     }
 719 
 720     cb_data = pcmk__assert_alloc(1, sizeof(struct tcp_async_cb_data));
 721     cb_data->userdata = userdata;
 722     cb_data->callback = callback;
 723     cb_data->sock = sock;
 724     cb_data->timeout_ms = timeout_ms;
 725 
 726     if (rc == 0) {
 727         /* The connect was successful immediately, we still return to mainloop
 728          * and let this callback get called later. This avoids the user of this api
 729          * to have to account for the fact the callback could be invoked within this
 730          * function before returning. */
 731         cb_data->start = 0;
 732         interval = 1;
 733     } else {
 734         cb_data->start = time(NULL);
 735     }
 736 
 737     /* This timer function does a non-blocking poll on the socket to see if we
 738      * can use it. Once we can, the connect has completed. This method allows us
 739      * to connect without blocking the mainloop.
 740      *
 741      * @TODO Use a mainloop fd callback for this instead of polling. Something
 742      *       about the way mainloop is currently polling prevents this from
 743      *       working at the moment though. (See connect(2) regarding EINPROGRESS
 744      *       for possible new handling needed.)
 745      */
 746     crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
 747               interval, sock);
 748     timer = pcmk__create_timer(interval, check_connect_finished, cb_data);
 749     if (timer_id) {
 750         *timer_id = timer;
 751     }
 752 
 753     return pcmk_rc_ok;
 754 }
 755 
 756 /*!
 757  * \internal
 758  * \brief Attempt once to connect socket and set it non-blocking
 759  *
 760  * \param[in]  sock        Newly created socket
 761  * \param[in]  addr        Socket address information for connect
 762  * \param[in]  addrlen     Size of socket address information in bytes
 763  *
 764  * \return Standard Pacemaker return code
 765  */
 766 static int
 767 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
     /* [previous][next][first][last][top][bottom][index][help] */
 768 {
 769     int rc = connect(sock, addr, addrlen);
 770 
 771     if (rc < 0) {
 772         rc = errno;
 773         crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
 774                  pcmk_rc_str(rc), rc);
 775         return rc;
 776     }
 777 
 778     rc = pcmk__set_nonblocking(sock);
 779     if (rc != pcmk_rc_ok) {
 780         crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
 781                  pcmk_rc_str(rc), rc);
 782         return rc;
 783     }
 784 
 785     return pcmk_ok;
 786 }
 787 
 788 /*!
 789  * \internal
 790  * \brief Connect to server at specified TCP port
 791  *
 792  * \param[in]  host        Name of server to connect to
 793  * \param[in]  port        Server port to connect to
 794  * \param[in]  timeout_ms  If asynchronous, fail if not connected in this time
 795  * \param[out] timer_id    If asynchronous and this is non-NULL, retry timer ID
 796  *                         will be put here (for ease of cancelling by caller)
 797  * \param[out] sock_fd     Where to store socket file descriptor
 798  * \param[in]  userdata    If asynchronous, data to pass to callback
 799  * \param[in]  callback    If NULL, attempt a single synchronous connection,
 800  *                         otherwise retry asynchronously then call this
 801  *
 802  * \return Standard Pacemaker return code
 803  */
 804 int
 805 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
     /* [previous][next][first][last][top][bottom][index][help] */
 806                      int *sock_fd, void *userdata,
 807                      void (*callback) (void *userdata, int rc, int sock))
 808 {
 809     char buffer[INET6_ADDRSTRLEN];
 810     struct addrinfo *res = NULL;
 811     struct addrinfo *rp = NULL;
 812     struct addrinfo hints;
 813     const char *server = host;
 814     int rc;
 815     int sock = -1;
 816 
 817     CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
 818 
 819     // Get host's IP address(es)
 820     memset(&hints, 0, sizeof(struct addrinfo));
 821     hints.ai_family = AF_UNSPEC;        /* Allow IPv4 or IPv6 */
 822     hints.ai_socktype = SOCK_STREAM;
 823     hints.ai_flags = AI_CANONNAME;
 824 
 825     rc = getaddrinfo(server, NULL, &hints, &res);
 826     rc = pcmk__gaierror2rc(rc);
 827 
 828     if (rc != pcmk_rc_ok) {
 829         crm_err("Unable to get IP address info for %s: %s",
 830                 server, pcmk_rc_str(rc));
 831         goto async_cleanup;
 832     }
 833 
 834     if (!res || !res->ai_addr) {
 835         crm_err("Unable to get IP address info for %s: no result", server);
 836         rc = ENOTCONN;
 837         goto async_cleanup;
 838     }
 839 
 840     // getaddrinfo() returns a list of host's addresses, try them in order
 841     for (rp = res; rp != NULL; rp = rp->ai_next) {
 842         struct sockaddr *addr = rp->ai_addr;
 843 
 844         if (!addr) {
 845             continue;
 846         }
 847 
 848         if (rp->ai_canonname) {
 849             server = res->ai_canonname;
 850         }
 851         crm_debug("Got canonical name %s for %s", server, host);
 852 
 853         sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
 854         if (sock == -1) {
 855             rc = errno;
 856             crm_warn("Could not create socket for remote connection to %s:%d: "
 857                      "%s " QB_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
 858             continue;
 859         }
 860 
 861         /* Set port appropriately for address family */
 862         /* (void*) casts avoid false-positive compiler alignment warnings */
 863         if (addr->sa_family == AF_INET6) {
 864             ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
 865         } else {
 866             ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
 867         }
 868 
 869         memset(buffer, 0, PCMK__NELEM(buffer));
 870         pcmk__sockaddr2str(addr, buffer);
 871         crm_info("Attempting remote connection to %s:%d", buffer, port);
 872 
 873         if (callback) {
 874             if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
 875                                      timer_id, userdata, callback) == pcmk_rc_ok) {
 876                 goto async_cleanup; /* Success for now, we'll hear back later in the callback */
 877             }
 878 
 879         } else if (connect_socket_once(sock, rp->ai_addr,
 880                                        rp->ai_addrlen) == pcmk_rc_ok) {
 881             break;          /* Success */
 882         }
 883 
 884         // Connect failed
 885         close(sock);
 886         sock = -1;
 887         rc = ENOTCONN;
 888     }
 889 
 890 async_cleanup:
 891 
 892     if (res) {
 893         freeaddrinfo(res);
 894     }
 895     *sock_fd = sock;
 896     return rc;
 897 }
 898 
 899 /*!
 900  * \internal
 901  * \brief Convert an IP address (IPv4 or IPv6) to a string for logging
 902  *
 903  * \param[in]  sa  Socket address for IP
 904  * \param[out] s   Storage for at least INET6_ADDRSTRLEN bytes
 905  *
 906  * \note sa The socket address can be a pointer to struct sockaddr_in (IPv4),
 907  *          struct sockaddr_in6 (IPv6) or struct sockaddr_storage (either),
 908  *          as long as its sa_family member is set correctly.
 909  */
 910 void
 911 pcmk__sockaddr2str(const void *sa, char *s)
     /* [previous][next][first][last][top][bottom][index][help] */
 912 {
 913     switch (((const struct sockaddr *) sa)->sa_family) {
 914         case AF_INET:
 915             inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
 916                       s, INET6_ADDRSTRLEN);
 917             break;
 918 
 919         case AF_INET6:
 920             inet_ntop(AF_INET6,
 921                       &(((const struct sockaddr_in6 *) sa)->sin6_addr),
 922                       s, INET6_ADDRSTRLEN);
 923             break;
 924 
 925         default:
 926             strcpy(s, "<invalid>");
 927     }
 928 }
 929 
 930 /*!
 931  * \internal
 932  * \brief Accept a client connection on a remote server socket
 933  *
 934  * \param[in]  ssock  Server socket file descriptor being listened on
 935  * \param[out] csock  Where to put new client socket's file descriptor
 936  *
 937  * \return Standard Pacemaker return code
 938  */
 939 int
 940 pcmk__accept_remote_connection(int ssock, int *csock)
     /* [previous][next][first][last][top][bottom][index][help] */
 941 {
 942     int rc;
 943     struct sockaddr_storage addr;
 944     socklen_t laddr = sizeof(addr);
 945     char addr_str[INET6_ADDRSTRLEN];
 946 #ifdef TCP_USER_TIMEOUT
 947     long sbd_timeout = 0;
 948 #endif
 949 
 950     /* accept the connection */
 951     memset(&addr, 0, sizeof(addr));
 952     *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
 953     if (*csock == -1) {
 954         rc = errno;
 955         crm_err("Could not accept remote client connection: %s "
 956                 QB_XS " rc=%d", pcmk_rc_str(rc), rc);
 957         return rc;
 958     }
 959     pcmk__sockaddr2str(&addr, addr_str);
 960     crm_info("Accepted new remote client connection from %s", addr_str);
 961 
 962     rc = pcmk__set_nonblocking(*csock);
 963     if (rc != pcmk_rc_ok) {
 964         crm_err("Could not set socket non-blocking: %s " QB_XS " rc=%d",
 965                 pcmk_rc_str(rc), rc);
 966         close(*csock);
 967         *csock = -1;
 968         return rc;
 969     }
 970 
 971 #ifdef TCP_USER_TIMEOUT
 972     sbd_timeout = pcmk__get_sbd_watchdog_timeout();
 973     if (sbd_timeout > 0) {
 974         // Time to fail and retry before watchdog
 975         long half = sbd_timeout / 2;
 976         unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
 977 
 978         rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
 979                         &optval, sizeof(optval));
 980         if (rc < 0) {
 981             rc = errno;
 982             crm_err("Could not set TCP timeout to %d ms on remote connection: "
 983                     "%s " QB_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
 984             close(*csock);
 985             *csock = -1;
 986             return rc;
 987         }
 988     }
 989 #endif
 990 
 991     return rc;
 992 }
 993 
 994 /*!
 995  * \brief Get the default remote connection TCP port on this host
 996  *
 997  * \return Remote connection TCP port number
 998  */
 999 int
1000 crm_default_remote_port(void)
     /* [previous][next][first][last][top][bottom][index][help] */
1001 {
1002     static int port = 0;
1003 
1004     if (port == 0) {
1005         const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
1006 
1007         if (env) {
1008             errno = 0;
1009             port = strtol(env, NULL, 10);
1010             if (errno || (port < 1) || (port > 65535)) {
1011                 crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
1012                          " has invalid value '%s', using %d instead",
1013                          env, DEFAULT_REMOTE_PORT);
1014                 port = DEFAULT_REMOTE_PORT;
1015             }
1016         } else {
1017             port = DEFAULT_REMOTE_PORT;
1018         }
1019     }
1020     return port;
1021 }

/* [previous][next][first][last][top][bottom][index][help] */