root/lib/common/remote.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. localized_remote_header
  2. send_tls
  3. send_plaintext
  4. remote_send_iovs
  5. pcmk__remote_send_xml
  6. pcmk__remote_message_xml
  7. get_remote_socket
  8. pcmk__remote_ready
  9. pcmk__read_available_remote_data
  10. pcmk__read_remote_message
  11. check_connect_finished
  12. connect_socket_retry
  13. connect_socket_once
  14. pcmk__connect_remote
  15. pcmk__sockaddr2str
  16. pcmk__accept_remote_connection
  17. crm_default_remote_port

   1 /*
   2  * Copyright 2008-2024 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 #include <crm/crm.h>
  12 
  13 #include <sys/param.h>
  14 #include <stdio.h>
  15 #include <sys/types.h>
  16 #include <sys/stat.h>
  17 #include <unistd.h>
  18 #include <sys/socket.h>
  19 #include <arpa/inet.h>
  20 #include <netinet/in.h>
  21 #include <netinet/ip.h>
  22 #include <netinet/tcp.h>
  23 #include <netdb.h>
  24 #include <stdlib.h>
  25 #include <errno.h>
  26 #include <inttypes.h>   // PRIx32
  27 
  28 #include <glib.h>
  29 #include <bzlib.h>
  30 
  31 #include <crm/common/ipc_internal.h>
  32 #include <crm/common/xml.h>
  33 #include <crm/common/mainloop.h>
  34 #include <crm/common/remote_internal.h>
  35 #include <crm/common/tls_internal.h>
  36 
  37 #include <gnutls/gnutls.h>
  38 
  39 /* Swab macros from linux/swab.h */
  40 #ifdef HAVE_LINUX_SWAB_H
  41 #  include <linux/swab.h>
  42 #else
  43 /*
  44  * casts are necessary for constants, because we never know how for sure
  45  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
  46  */
  47 #define __swab16(x) ((uint16_t)(                                      \
  48         (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) |                  \
  49         (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
  50 
  51 #define __swab32(x) ((uint32_t)(                                      \
  52         (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) |            \
  53         (((uint32_t)(x) & (uint32_t)0x0000ff00UL) <<  8) |            \
  54         (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >>  8) |            \
  55         (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
  56 
  57 #define __swab64(x) ((uint64_t)(                                      \
  58         (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) |   \
  59         (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) |   \
  60         (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) |   \
  61         (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) <<  8) |   \
  62         (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >>  8) |   \
  63         (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) |   \
  64         (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) |   \
  65         (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
  66 #endif
  67 
  68 #define REMOTE_MSG_VERSION 1
  69 #define ENDIAN_LOCAL 0xBADADBBD
  70 
  71 struct remote_header_v0 {
  72     uint32_t endian;    /* Detect messages from hosts with different endian-ness */
  73     uint32_t version;
  74     uint64_t id;
  75     uint64_t flags;
  76     uint32_t size_total;
  77     uint32_t payload_offset;
  78     uint32_t payload_compressed;
  79     uint32_t payload_uncompressed;
  80 
  81         /* New fields get added here */
  82 
  83 } __attribute__ ((packed));
  84 
  85 /*!
  86  * \internal
  87  * \brief Retrieve remote message header, in local endianness
  88  *
  89  * Return a pointer to the header portion of a remote connection's message
  90  * buffer, converting the header to local endianness if needed.
  91  *
  92  * \param[in,out] remote  Remote connection with new message
  93  *
  94  * \return Pointer to message header, localized if necessary
  95  */
  96 static struct remote_header_v0 *
  97 localized_remote_header(pcmk__remote_t *remote)
     /* [previous][next][first][last][top][bottom][index][help] */
  98 {
  99     struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
 100     if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
 101         return NULL;
 102 
 103     } else if(header->endian != ENDIAN_LOCAL) {
 104         uint32_t endian = __swab32(header->endian);
 105 
 106         CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
 107         if(endian != ENDIAN_LOCAL) {
 108             crm_err("Invalid message detected, endian mismatch: %" PRIx32
 109                     " is neither %" PRIx32 " nor the swab'd %" PRIx32,
 110                     ENDIAN_LOCAL, header->endian, endian);
 111             return NULL;
 112         }
 113 
 114         header->id = __swab64(header->id);
 115         header->flags = __swab64(header->flags);
 116         header->endian = __swab32(header->endian);
 117 
 118         header->version = __swab32(header->version);
 119         header->size_total = __swab32(header->size_total);
 120         header->payload_offset = __swab32(header->payload_offset);
 121         header->payload_compressed = __swab32(header->payload_compressed);
 122         header->payload_uncompressed = __swab32(header->payload_uncompressed);
 123     }
 124 
 125     return header;
 126 }
 127 
 128 // \return Standard Pacemaker return code
 129 static int
 130 send_tls(gnutls_session_t session, struct iovec *iov)
     /* [previous][next][first][last][top][bottom][index][help] */
 131 {
 132     const char *unsent = iov->iov_base;
 133     size_t unsent_len = iov->iov_len;
 134     ssize_t gnutls_rc;
 135 
 136     if (unsent == NULL) {
 137         return EINVAL;
 138     }
 139 
 140     crm_trace("Sending TLS message of %llu bytes",
 141               (unsigned long long) unsent_len);
 142     while (true) {
 143         gnutls_rc = gnutls_record_send(session, unsent, unsent_len);
 144 
 145         if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
 146             crm_trace("Retrying to send %llu bytes remaining",
 147                       (unsigned long long) unsent_len);
 148 
 149         } else if (gnutls_rc < 0) {
 150             // Caller can log as error if necessary
 151             crm_info("TLS connection terminated: %s " QB_XS " rc=%lld",
 152                      gnutls_strerror((int) gnutls_rc),
 153                      (long long) gnutls_rc);
 154             return ECONNABORTED;
 155 
 156         } else if (gnutls_rc < unsent_len) {
 157             crm_trace("Sent %lld of %llu bytes remaining",
 158                       (long long) gnutls_rc, (unsigned long long) unsent_len);
 159             unsent_len -= gnutls_rc;
 160             unsent += gnutls_rc;
 161         } else {
 162             crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
 163             break;
 164         }
 165     }
 166     return pcmk_rc_ok;
 167 }
 168 
 169 // \return Standard Pacemaker return code
 170 static int
 171 send_plaintext(int sock, struct iovec *iov)
     /* [previous][next][first][last][top][bottom][index][help] */
 172 {
 173     const char *unsent = iov->iov_base;
 174     size_t unsent_len = iov->iov_len;
 175     ssize_t write_rc;
 176 
 177     if (unsent == NULL) {
 178         return EINVAL;
 179     }
 180 
 181     crm_debug("Sending plaintext message of %llu bytes to socket %d",
 182               (unsigned long long) unsent_len, sock);
 183     while (true) {
 184         write_rc = write(sock, unsent, unsent_len);
 185         if (write_rc < 0) {
 186             int rc = errno;
 187 
 188             if ((errno == EINTR) || (errno == EAGAIN)) {
 189                 crm_trace("Retrying to send %llu bytes remaining to socket %d",
 190                           (unsigned long long) unsent_len, sock);
 191                 continue;
 192             }
 193 
 194             // Caller can log as error if necessary
 195             crm_info("Could not send message: %s " QB_XS " rc=%d socket=%d",
 196                      pcmk_rc_str(rc), rc, sock);
 197             return rc;
 198 
 199         } else if (write_rc < unsent_len) {
 200             crm_trace("Sent %lld of %llu bytes remaining",
 201                       (long long) write_rc, (unsigned long long) unsent_len);
 202             unsent += write_rc;
 203             unsent_len -= write_rc;
 204             continue;
 205 
 206         } else {
 207             crm_trace("Sent all %lld bytes remaining: %.100s",
 208                       (long long) write_rc, (char *) (iov->iov_base));
 209             break;
 210         }
 211     }
 212     return pcmk_rc_ok;
 213 }
 214 
 215 // \return Standard Pacemaker return code
 216 static int
 217 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
     /* [previous][next][first][last][top][bottom][index][help] */
 218 {
 219     int rc = pcmk_rc_ok;
 220 
 221     for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
 222         if (remote->tls_session) {
 223             rc = send_tls(remote->tls_session, &(iov[lpc]));
 224             continue;
 225         }
 226         if (remote->tcp_socket) {
 227             rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
 228         } else {
 229             rc = ESOCKTNOSUPPORT;
 230         }
 231     }
 232     return rc;
 233 }
 234 
 235 /*!
 236  * \internal
 237  * \brief Send an XML message over a Pacemaker Remote connection
 238  *
 239  * \param[in,out] remote  Pacemaker Remote connection to use
 240  * \param[in]     msg     XML to send
 241  *
 242  * \return Standard Pacemaker return code
 243  */
 244 int
 245 pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
     /* [previous][next][first][last][top][bottom][index][help] */
 246 {
 247     int rc = pcmk_rc_ok;
 248     static uint64_t id = 0;
 249     GString *xml_text = NULL;
 250 
 251     struct iovec iov[2];
 252     struct remote_header_v0 *header;
 253 
 254     CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
 255 
 256     xml_text = g_string_sized_new(1024);
 257     pcmk__xml_string(msg, 0, xml_text, 0);
 258     CRM_CHECK(xml_text->len > 0,
 259               g_string_free(xml_text, TRUE); return EINVAL);
 260 
 261     header = pcmk__assert_alloc(1, sizeof(struct remote_header_v0));
 262 
 263     iov[0].iov_base = header;
 264     iov[0].iov_len = sizeof(struct remote_header_v0);
 265 
 266     iov[1].iov_len = 1 + xml_text->len;
 267     iov[1].iov_base = g_string_free(xml_text, FALSE);
 268 
 269     id++;
 270     header->id = id;
 271     header->endian = ENDIAN_LOCAL;
 272     header->version = REMOTE_MSG_VERSION;
 273     header->payload_offset = iov[0].iov_len;
 274     header->payload_uncompressed = iov[1].iov_len;
 275     header->size_total = iov[0].iov_len + iov[1].iov_len;
 276 
 277     rc = remote_send_iovs(remote, iov, 2);
 278     if (rc != pcmk_rc_ok) {
 279         crm_err("Could not send remote message: %s " QB_XS " rc=%d",
 280                 pcmk_rc_str(rc), rc);
 281     }
 282 
 283     free(iov[0].iov_base);
 284     g_free((gchar *) iov[1].iov_base);
 285     return rc;
 286 }
 287 
 288 /*!
 289  * \internal
 290  * \brief Obtain the XML from the currently buffered remote connection message
 291  *
 292  * \param[in,out] remote  Remote connection possibly with message available
 293  *
 294  * \return Newly allocated XML object corresponding to message data, or NULL
 295  * \note This effectively removes the message from the connection buffer.
 296  */
 297 xmlNode *
 298 pcmk__remote_message_xml(pcmk__remote_t *remote)
     /* [previous][next][first][last][top][bottom][index][help] */
 299 {
 300     xmlNode *xml = NULL;
 301     struct remote_header_v0 *header = localized_remote_header(remote);
 302 
 303     if (header == NULL) {
 304         return NULL;
 305     }
 306 
 307     /* Support compression on the receiving end now, in case we ever want to add it later */
 308     if (header->payload_compressed) {
 309         int rc = 0;
 310         unsigned int size_u = 1 + header->payload_uncompressed;
 311         char *uncompressed =
 312             pcmk__assert_alloc(1, header->payload_offset + size_u);
 313 
 314         crm_trace("Decompressing message data %d bytes into %d bytes",
 315                  header->payload_compressed, size_u);
 316 
 317         rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
 318                                         remote->buffer + header->payload_offset,
 319                                         header->payload_compressed, 1, 0);
 320         rc = pcmk__bzlib2rc(rc);
 321 
 322         if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
 323             crm_warn("Couldn't decompress v%d message, we only understand v%d",
 324                      header->version, REMOTE_MSG_VERSION);
 325             free(uncompressed);
 326             return NULL;
 327 
 328         } else if (rc != pcmk_rc_ok) {
 329             crm_err("Decompression failed: %s " QB_XS " rc=%d",
 330                     pcmk_rc_str(rc), rc);
 331             free(uncompressed);
 332             return NULL;
 333         }
 334 
 335         pcmk__assert(size_u == header->payload_uncompressed);
 336 
 337         memcpy(uncompressed, remote->buffer, header->payload_offset);       /* Preserve the header */
 338         remote->buffer_size = header->payload_offset + size_u;
 339 
 340         free(remote->buffer);
 341         remote->buffer = uncompressed;
 342         header = localized_remote_header(remote);
 343     }
 344 
 345     /* take ownership of the buffer */
 346     remote->buffer_offset = 0;
 347 
 348     CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
 349 
 350     xml = pcmk__xml_parse(remote->buffer + header->payload_offset);
 351     if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
 352         crm_warn("Couldn't parse v%d message, we only understand v%d",
 353                  header->version, REMOTE_MSG_VERSION);
 354 
 355     } else if (xml == NULL) {
 356         crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
 357     }
 358 
 359     crm_log_xml_trace(xml, "[remote msg]");
 360     return xml;
 361 }
 362 
 363 static int
 364 get_remote_socket(const pcmk__remote_t *remote)
     /* [previous][next][first][last][top][bottom][index][help] */
 365 {
 366     if (remote->tls_session) {
 367         void *sock_ptr = gnutls_transport_get_ptr(remote->tls_session);
 368 
 369         return GPOINTER_TO_INT(sock_ptr);
 370     }
 371 
 372     if (remote->tcp_socket) {
 373         return remote->tcp_socket;
 374     }
 375 
 376     crm_err("Remote connection type undetermined (bug?)");
 377     return -1;
 378 }
 379 
 380 /*!
 381  * \internal
 382  * \brief Wait for a remote session to have data to read
 383  *
 384  * \param[in] remote      Connection to check
 385  * \param[in] timeout_ms  Maximum time (in ms) to wait
 386  *
 387  * \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if
 388  *         there is data ready to be read, and ETIME if there is no data within
 389  *         the specified timeout)
 390  */
 391 int
 392 pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
     /* [previous][next][first][last][top][bottom][index][help] */
 393 {
 394     struct pollfd fds = { 0, };
 395     int sock = 0;
 396     int rc = 0;
 397     time_t start;
 398     int timeout = timeout_ms;
 399 
 400     sock = get_remote_socket(remote);
 401     if (sock <= 0) {
 402         crm_trace("No longer connected");
 403         return ENOTCONN;
 404     }
 405 
 406     start = time(NULL);
 407     errno = 0;
 408     do {
 409         fds.fd = sock;
 410         fds.events = POLLIN;
 411 
 412         /* If we got an EINTR while polling, and we have a
 413          * specific timeout we are trying to honor, attempt
 414          * to adjust the timeout to the closest second. */
 415         if (errno == EINTR && (timeout > 0)) {
 416             timeout = timeout_ms - ((time(NULL) - start) * 1000);
 417             if (timeout < 1000) {
 418                 timeout = 1000;
 419             }
 420         }
 421 
 422         rc = poll(&fds, 1, timeout);
 423     } while (rc < 0 && errno == EINTR);
 424 
 425     if (rc < 0) {
 426         return errno;
 427     }
 428     return (rc == 0)? ETIME : pcmk_rc_ok;
 429 }
 430 
 431 /*!
 432  * \internal
 433  * \brief Read bytes from non-blocking remote connection
 434  *
 435  * \param[in,out] remote  Remote connection to read
 436  *
 437  * \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if
 438  *         a full message has been received, or EAGAIN for a partial message)
 439  * \note Use only with non-blocking sockets after polling the socket.
 440  * \note This function will return when the socket read buffer is empty or an
 441  *       error is encountered.
 442  */
 443 int
 444 pcmk__read_available_remote_data(pcmk__remote_t *remote)
     /* [previous][next][first][last][top][bottom][index][help] */
 445 {
 446     int rc = pcmk_rc_ok;
 447     size_t read_len = sizeof(struct remote_header_v0);
 448     struct remote_header_v0 *header = localized_remote_header(remote);
 449     ssize_t read_rc;
 450 
 451     if(header) {
 452         /* Stop at the end of the current message */
 453         read_len = header->size_total;
 454     }
 455 
 456     /* automatically grow the buffer when needed */
 457     if(remote->buffer_size < read_len) {
 458         remote->buffer_size = 2 * read_len;
 459         crm_trace("Expanding buffer to %llu bytes",
 460                   (unsigned long long) remote->buffer_size);
 461         remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
 462     }
 463 
 464     if (remote->tls_session) {
 465         read_rc = gnutls_record_recv(remote->tls_session,
 466                                      remote->buffer + remote->buffer_offset,
 467                                      remote->buffer_size - remote->buffer_offset);
 468         if (read_rc == GNUTLS_E_INTERRUPTED) {
 469             rc = EINTR;
 470         } else if (read_rc == GNUTLS_E_AGAIN) {
 471             rc = EAGAIN;
 472         } else if (read_rc < 0) {
 473             crm_debug("TLS receive failed: %s (%lld)",
 474                       gnutls_strerror(read_rc), (long long) read_rc);
 475             rc = EIO;
 476         }
 477     } else if (remote->tcp_socket) {
 478         read_rc = read(remote->tcp_socket,
 479                        remote->buffer + remote->buffer_offset,
 480                        remote->buffer_size - remote->buffer_offset);
 481         if (read_rc < 0) {
 482             rc = errno;
 483         }
 484     } else {
 485         crm_err("Remote connection type undetermined (bug?)");
 486         return ESOCKTNOSUPPORT;
 487     }
 488 
 489     /* process any errors. */
 490     if (read_rc > 0) {
 491         remote->buffer_offset += read_rc;
 492         /* always null terminate buffer, the +1 to alloc always allows for this. */
 493         remote->buffer[remote->buffer_offset] = '\0';
 494         crm_trace("Received %lld more bytes (%llu total)",
 495                   (long long) read_rc,
 496                   (unsigned long long) remote->buffer_offset);
 497 
 498     } else if ((rc == EINTR) || (rc == EAGAIN)) {
 499         crm_trace("No data available for non-blocking remote read: %s (%d)",
 500                   pcmk_rc_str(rc), rc);
 501 
 502     } else if (read_rc == 0) {
 503         crm_debug("End of remote data encountered after %llu bytes",
 504                   (unsigned long long) remote->buffer_offset);
 505         return ENOTCONN;
 506 
 507     } else {
 508         crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
 509                   (unsigned long long) remote->buffer_offset,
 510                   pcmk_rc_str(rc), rc);
 511         return ENOTCONN;
 512     }
 513 
 514     header = localized_remote_header(remote);
 515     if(header) {
 516         if(remote->buffer_offset < header->size_total) {
 517             crm_trace("Read partial remote message (%llu of %u bytes)",
 518                       (unsigned long long) remote->buffer_offset,
 519                       header->size_total);
 520         } else {
 521             crm_trace("Read full remote message of %llu bytes",
 522                       (unsigned long long) remote->buffer_offset);
 523             return pcmk_rc_ok;
 524         }
 525     }
 526 
 527     return EAGAIN;
 528 }
 529 
 530 /*!
 531  * \internal
 532  * \brief Read one message from a remote connection
 533  *
 534  * \param[in,out] remote      Remote connection to read
 535  * \param[in]     timeout_ms  Fail if message not read in this many milliseconds
 536  *                            (10s will be used if 0, and 60s if negative)
 537  *
 538  * \return Standard Pacemaker return code
 539  */
 540 int
 541 pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
     /* [previous][next][first][last][top][bottom][index][help] */
 542 {
 543     int rc = pcmk_rc_ok;
 544     time_t start = time(NULL);
 545     int remaining_timeout = 0;
 546 
 547     if (timeout_ms == 0) {
 548         timeout_ms = 10000;
 549     } else if (timeout_ms < 0) {
 550         timeout_ms = 60000;
 551     }
 552 
 553     remaining_timeout = timeout_ms;
 554     while (remaining_timeout > 0) {
 555 
 556         crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
 557                   remaining_timeout, timeout_ms);
 558         rc = pcmk__remote_ready(remote, remaining_timeout);
 559 
 560         if (rc == ETIME) {
 561             crm_err("Timed out (%d ms) while waiting for remote data",
 562                     remaining_timeout);
 563             return rc;
 564 
 565         } else if (rc != pcmk_rc_ok) {
 566             crm_debug("Wait for remote data aborted (will retry): %s "
 567                       QB_XS " rc=%d", pcmk_rc_str(rc), rc);
 568 
 569         } else {
 570             rc = pcmk__read_available_remote_data(remote);
 571             if (rc == pcmk_rc_ok) {
 572                 return rc;
 573             } else if (rc == EAGAIN) {
 574                 crm_trace("Waiting for more remote data");
 575             } else {
 576                 crm_debug("Could not receive remote data: %s " QB_XS " rc=%d",
 577                           pcmk_rc_str(rc), rc);
 578             }
 579         }
 580 
 581         // Don't waste time retrying after fatal errors
 582         if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
 583             return rc;
 584         }
 585 
 586         remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
 587     }
 588     return ETIME;
 589 }
 590 
 591 struct tcp_async_cb_data {
 592     int sock;
 593     int timeout_ms;
 594     time_t start;
 595     void *userdata;
 596     void (*callback) (void *userdata, int rc, int sock);
 597 };
 598 
 599 // \return TRUE if timer should be rescheduled, FALSE otherwise
 600 static gboolean
 601 check_connect_finished(gpointer userdata)
     /* [previous][next][first][last][top][bottom][index][help] */
 602 {
 603     struct tcp_async_cb_data *cb_data = userdata;
 604     int rc;
 605 
 606     fd_set rset, wset;
 607     struct timeval ts = { 0, };
 608 
 609     if (cb_data->start == 0) {
 610         // Last connect() returned success immediately
 611         rc = pcmk_rc_ok;
 612         goto dispatch_done;
 613     }
 614 
 615     // If the socket is ready for reading or writing, the connect succeeded
 616     FD_ZERO(&rset);
 617     FD_SET(cb_data->sock, &rset);
 618     wset = rset;
 619     rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
 620 
 621     if (rc < 0) { // select() error
 622         rc = errno;
 623         if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
 624             if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
 625                 return TRUE; // There is time left, so reschedule timer
 626             } else {
 627                 rc = ETIMEDOUT;
 628             }
 629         }
 630         crm_trace("Could not check socket %d for connection success: %s (%d)",
 631                   cb_data->sock, pcmk_rc_str(rc), rc);
 632 
 633     } else if (rc == 0) { // select() timeout
 634         if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
 635             return TRUE; // There is time left, so reschedule timer
 636         }
 637         crm_debug("Timed out while waiting for socket %d connection success",
 638                   cb_data->sock);
 639         rc = ETIMEDOUT;
 640 
 641     // select() returned number of file descriptors that are ready
 642 
 643     } else if (FD_ISSET(cb_data->sock, &rset)
 644                || FD_ISSET(cb_data->sock, &wset)) {
 645 
 646         // The socket is ready; check it for connection errors
 647         int error = 0;
 648         socklen_t len = sizeof(error);
 649 
 650         if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
 651             rc = errno;
 652             crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
 653                       cb_data->sock, pcmk_rc_str(rc), rc);
 654         } else if (error != 0) {
 655             rc = error;
 656             crm_trace("Socket %d connected with error: %s (%d)",
 657                       cb_data->sock, pcmk_rc_str(rc), rc);
 658         } else {
 659             rc = pcmk_rc_ok;
 660         }
 661 
 662     } else { // Should not be possible
 663         crm_trace("select() succeeded, but socket %d not in resulting "
 664                   "read/write sets", cb_data->sock);
 665         rc = EAGAIN;
 666     }
 667 
 668   dispatch_done:
 669     if (rc == pcmk_rc_ok) {
 670         crm_trace("Socket %d is connected", cb_data->sock);
 671     } else {
 672         close(cb_data->sock);
 673         cb_data->sock = -1;
 674     }
 675 
 676     if (cb_data->callback) {
 677         cb_data->callback(cb_data->userdata, rc, cb_data->sock);
 678     }
 679     free(cb_data);
 680     return FALSE; // Do not reschedule timer
 681 }
 682 
 683 /*!
 684  * \internal
 685  * \brief Attempt to connect socket, calling callback when done
 686  *
 687  * Set a given socket non-blocking, then attempt to connect to it,
 688  * retrying periodically until success or a timeout is reached.
 689  * Call a caller-supplied callback function when completed.
 690  *
 691  * \param[in]  sock        Newly created socket
 692  * \param[in]  addr        Socket address information for connect
 693  * \param[in]  addrlen     Size of socket address information in bytes
 694  * \param[in]  timeout_ms  Fail if not connected within this much time
 695  * \param[out] timer_id    If not NULL, store retry timer ID here
 696  * \param[in]  userdata    User data to pass to callback
 697  * \param[in]  callback    Function to call when connection attempt completes
 698  *
 699  * \return Standard Pacemaker return code
 700  */
 701 static int
 702 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
     /* [previous][next][first][last][top][bottom][index][help] */
 703                      int timeout_ms, int *timer_id, void *userdata,
 704                      void (*callback) (void *userdata, int rc, int sock))
 705 {
 706     int rc = 0;
 707     int interval = 500;
 708     int timer;
 709     struct tcp_async_cb_data *cb_data = NULL;
 710 
 711     rc = pcmk__set_nonblocking(sock);
 712     if (rc != pcmk_rc_ok) {
 713         crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
 714                  pcmk_rc_str(rc), rc);
 715         return rc;
 716     }
 717 
 718     rc = connect(sock, addr, addrlen);
 719     if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
 720         rc = errno;
 721         crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
 722                  pcmk_rc_str(rc), rc);
 723         return rc;
 724     }
 725 
 726     cb_data = pcmk__assert_alloc(1, sizeof(struct tcp_async_cb_data));
 727     cb_data->userdata = userdata;
 728     cb_data->callback = callback;
 729     cb_data->sock = sock;
 730     cb_data->timeout_ms = timeout_ms;
 731 
 732     if (rc == 0) {
 733         /* The connect was successful immediately, we still return to mainloop
 734          * and let this callback get called later. This avoids the user of this api
 735          * to have to account for the fact the callback could be invoked within this
 736          * function before returning. */
 737         cb_data->start = 0;
 738         interval = 1;
 739     } else {
 740         cb_data->start = time(NULL);
 741     }
 742 
 743     /* This timer function does a non-blocking poll on the socket to see if we
 744      * can use it. Once we can, the connect has completed. This method allows us
 745      * to connect without blocking the mainloop.
 746      *
 747      * @TODO Use a mainloop fd callback for this instead of polling. Something
 748      *       about the way mainloop is currently polling prevents this from
 749      *       working at the moment though. (See connect(2) regarding EINPROGRESS
 750      *       for possible new handling needed.)
 751      */
 752     crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
 753               interval, sock);
 754     timer = pcmk__create_timer(interval, check_connect_finished, cb_data);
 755     if (timer_id) {
 756         *timer_id = timer;
 757     }
 758 
 759     // timer callback should be taking care of cb_data
 760     // cppcheck-suppress memleak
 761     return pcmk_rc_ok;
 762 }
 763 
 764 /*!
 765  * \internal
 766  * \brief Attempt once to connect socket and set it non-blocking
 767  *
 768  * \param[in]  sock        Newly created socket
 769  * \param[in]  addr        Socket address information for connect
 770  * \param[in]  addrlen     Size of socket address information in bytes
 771  *
 772  * \return Standard Pacemaker return code
 773  */
 774 static int
 775 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
     /* [previous][next][first][last][top][bottom][index][help] */
 776 {
 777     int rc = connect(sock, addr, addrlen);
 778 
 779     if (rc < 0) {
 780         rc = errno;
 781         crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
 782                  pcmk_rc_str(rc), rc);
 783         return rc;
 784     }
 785 
 786     rc = pcmk__set_nonblocking(sock);
 787     if (rc != pcmk_rc_ok) {
 788         crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
 789                  pcmk_rc_str(rc), rc);
 790         return rc;
 791     }
 792 
 793     return pcmk_ok;
 794 }
 795 
 796 /*!
 797  * \internal
 798  * \brief Connect to server at specified TCP port
 799  *
 800  * \param[in]  host        Name of server to connect to
 801  * \param[in]  port        Server port to connect to
 802  * \param[in]  timeout_ms  If asynchronous, fail if not connected in this time
 803  * \param[out] timer_id    If asynchronous and this is non-NULL, retry timer ID
 804  *                         will be put here (for ease of cancelling by caller)
 805  * \param[out] sock_fd     Where to store socket file descriptor
 806  * \param[in]  userdata    If asynchronous, data to pass to callback
 807  * \param[in]  callback    If NULL, attempt a single synchronous connection,
 808  *                         otherwise retry asynchronously then call this
 809  *
 810  * \return Standard Pacemaker return code
 811  */
 812 int
 813 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
     /* [previous][next][first][last][top][bottom][index][help] */
 814                      int *sock_fd, void *userdata,
 815                      void (*callback) (void *userdata, int rc, int sock))
 816 {
 817     char buffer[INET6_ADDRSTRLEN];
 818     struct addrinfo *res = NULL;
 819     struct addrinfo *rp = NULL;
 820     struct addrinfo hints;
 821     const char *server = host;
 822     int rc;
 823     int sock = -1;
 824 
 825     CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
 826 
 827     // Get host's IP address(es)
 828     memset(&hints, 0, sizeof(struct addrinfo));
 829     hints.ai_family = AF_UNSPEC;        /* Allow IPv4 or IPv6 */
 830     hints.ai_socktype = SOCK_STREAM;
 831     hints.ai_flags = AI_CANONNAME;
 832 
 833     rc = getaddrinfo(server, NULL, &hints, &res);
 834     rc = pcmk__gaierror2rc(rc);
 835 
 836     if (rc != pcmk_rc_ok) {
 837         crm_err("Unable to get IP address info for %s: %s",
 838                 server, pcmk_rc_str(rc));
 839         goto async_cleanup;
 840     }
 841 
 842     if (!res || !res->ai_addr) {
 843         crm_err("Unable to get IP address info for %s: no result", server);
 844         rc = ENOTCONN;
 845         goto async_cleanup;
 846     }
 847 
 848     // getaddrinfo() returns a list of host's addresses, try them in order
 849     for (rp = res; rp != NULL; rp = rp->ai_next) {
 850         struct sockaddr *addr = rp->ai_addr;
 851 
 852         if (!addr) {
 853             continue;
 854         }
 855 
 856         if (rp->ai_canonname) {
 857             server = res->ai_canonname;
 858         }
 859         crm_debug("Got canonical name %s for %s", server, host);
 860 
 861         sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
 862         if (sock == -1) {
 863             rc = errno;
 864             crm_warn("Could not create socket for remote connection to %s:%d: "
 865                      "%s " QB_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
 866             continue;
 867         }
 868 
 869         /* Set port appropriately for address family */
 870         /* (void*) casts avoid false-positive compiler alignment warnings */
 871         if (addr->sa_family == AF_INET6) {
 872             ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
 873         } else {
 874             ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
 875         }
 876 
 877         memset(buffer, 0, PCMK__NELEM(buffer));
 878         pcmk__sockaddr2str(addr, buffer);
 879         crm_info("Attempting remote connection to %s:%d", buffer, port);
 880 
 881         if (callback) {
 882             if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
 883                                      timer_id, userdata, callback) == pcmk_rc_ok) {
 884                 goto async_cleanup; /* Success for now, we'll hear back later in the callback */
 885             }
 886 
 887         } else if (connect_socket_once(sock, rp->ai_addr,
 888                                        rp->ai_addrlen) == pcmk_rc_ok) {
 889             break;          /* Success */
 890         }
 891 
 892         // Connect failed
 893         close(sock);
 894         sock = -1;
 895         rc = ENOTCONN;
 896     }
 897 
 898 async_cleanup:
 899 
 900     if (res) {
 901         freeaddrinfo(res);
 902     }
 903     *sock_fd = sock;
 904     return rc;
 905 }
 906 
 907 /*!
 908  * \internal
 909  * \brief Convert an IP address (IPv4 or IPv6) to a string for logging
 910  *
 911  * \param[in]  sa  Socket address for IP
 912  * \param[out] s   Storage for at least INET6_ADDRSTRLEN bytes
 913  *
 914  * \note sa The socket address can be a pointer to struct sockaddr_in (IPv4),
 915  *          struct sockaddr_in6 (IPv6) or struct sockaddr_storage (either),
 916  *          as long as its sa_family member is set correctly.
 917  */
 918 void
 919 pcmk__sockaddr2str(const void *sa, char *s)
     /* [previous][next][first][last][top][bottom][index][help] */
 920 {
 921     switch (((const struct sockaddr *) sa)->sa_family) {
 922         case AF_INET:
 923             inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
 924                       s, INET6_ADDRSTRLEN);
 925             break;
 926 
 927         case AF_INET6:
 928             inet_ntop(AF_INET6,
 929                       &(((const struct sockaddr_in6 *) sa)->sin6_addr),
 930                       s, INET6_ADDRSTRLEN);
 931             break;
 932 
 933         default:
 934             strcpy(s, "<invalid>");
 935     }
 936 }
 937 
 938 /*!
 939  * \internal
 940  * \brief Accept a client connection on a remote server socket
 941  *
 942  * \param[in]  ssock  Server socket file descriptor being listened on
 943  * \param[out] csock  Where to put new client socket's file descriptor
 944  *
 945  * \return Standard Pacemaker return code
 946  */
 947 int
 948 pcmk__accept_remote_connection(int ssock, int *csock)
     /* [previous][next][first][last][top][bottom][index][help] */
 949 {
 950     int rc;
 951     struct sockaddr_storage addr;
 952     socklen_t laddr = sizeof(addr);
 953     char addr_str[INET6_ADDRSTRLEN];
 954 #ifdef TCP_USER_TIMEOUT
 955     long sbd_timeout = 0;
 956 #endif
 957 
 958     /* accept the connection */
 959     memset(&addr, 0, sizeof(addr));
 960     *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
 961     if (*csock == -1) {
 962         rc = errno;
 963         crm_err("Could not accept remote client connection: %s "
 964                 QB_XS " rc=%d", pcmk_rc_str(rc), rc);
 965         return rc;
 966     }
 967     pcmk__sockaddr2str(&addr, addr_str);
 968     crm_info("Accepted new remote client connection from %s", addr_str);
 969 
 970     rc = pcmk__set_nonblocking(*csock);
 971     if (rc != pcmk_rc_ok) {
 972         crm_err("Could not set socket non-blocking: %s " QB_XS " rc=%d",
 973                 pcmk_rc_str(rc), rc);
 974         close(*csock);
 975         *csock = -1;
 976         return rc;
 977     }
 978 
 979 #ifdef TCP_USER_TIMEOUT
 980     sbd_timeout = pcmk__get_sbd_watchdog_timeout();
 981     if (sbd_timeout > 0) {
 982         // Time to fail and retry before watchdog
 983         long half = sbd_timeout / 2;
 984         unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
 985 
 986         rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
 987                         &optval, sizeof(optval));
 988         if (rc < 0) {
 989             rc = errno;
 990             crm_err("Could not set TCP timeout to %d ms on remote connection: "
 991                     "%s " QB_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
 992             close(*csock);
 993             *csock = -1;
 994             return rc;
 995         }
 996     }
 997 #endif
 998 
 999     return rc;
1000 }
1001 
1002 /*!
1003  * \brief Get the default remote connection TCP port on this host
1004  *
1005  * \return Remote connection TCP port number
1006  */
1007 int
1008 crm_default_remote_port(void)
     /* [previous][next][first][last][top][bottom][index][help] */
1009 {
1010     static int port = 0;
1011 
1012     if (port == 0) {
1013         const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
1014 
1015         if (env) {
1016             errno = 0;
1017             port = strtol(env, NULL, 10);
1018             if (errno || (port < 1) || (port > 65535)) {
1019                 crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
1020                          " has invalid value '%s', using %d instead",
1021                          env, DEFAULT_REMOTE_PORT);
1022                 port = DEFAULT_REMOTE_PORT;
1023             }
1024         } else {
1025             port = DEFAULT_REMOTE_PORT;
1026         }
1027     }
1028     return port;
1029 }

/* [previous][next][first][last][top][bottom][index][help] */