root/lib/common/remote.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. crm_remote_header
  2. crm_initiate_client_tls_handshake
  3. crm_create_anon_tls_session
  4. create_psk_tls_session
  5. crm_send_tls
  6. crm_send_plaintext
  7. crm_remote_sendv
  8. crm_remote_send
  9. crm_remote_parse_buffer
  10. crm_remote_ready
  11. crm_remote_recv_once
  12. crm_remote_recv
  13. check_connect_finished
  14. internal_tcp_connect_async
  15. internal_tcp_connect
  16. crm_remote_tcp_connect_async
  17. crm_remote_tcp_connect
  18. crm_sockaddr2str
  19. crm_remote_accept
  20. crm_default_remote_port

   1 /*
   2  * Copyright (c) 2008 Andrew Beekhof
   3  *
   4  * This library is free software; you can redistribute it and/or
   5  * modify it under the terms of the GNU Lesser General Public
   6  * License as published by the Free Software Foundation; either
   7  * version 2.1 of the License, or (at your option) any later version.
   8  * 
   9  * This library is distributed in the hope that it will be useful,
  10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  12  * Lesser General Public License for more details.
  13  * 
  14  * You should have received a copy of the GNU Lesser General Public
  15  * License along with this library; if not, write to the Free Software
  16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  17  *
  18  */
  19 #include <crm_internal.h>
  20 #include <crm/crm.h>
  21 
  22 #include <sys/param.h>
  23 #include <stdio.h>
  24 #include <sys/types.h>
  25 #include <sys/stat.h>
  26 #include <unistd.h>
  27 #include <sys/socket.h>
  28 #include <arpa/inet.h>
  29 #include <netinet/in.h>
  30 #include <netinet/ip.h>
  31 #include <netinet/tcp.h>
  32 #include <netdb.h>
  33 
  34 #include <stdlib.h>
  35 #include <errno.h>
  36 #include <glib.h>
  37 
  38 #include <bzlib.h>
  39 
  40 #include <crm/common/ipcs.h>
  41 #include <crm/common/xml.h>
  42 #include <crm/common/mainloop.h>
  43 
  44 #ifdef HAVE_GNUTLS_GNUTLS_H
  45 #  undef KEYFILE
  46 #  include <gnutls/gnutls.h>
  47 
  48 const int psk_tls_kx_order[] = {
  49     GNUTLS_KX_DHE_PSK,
  50     GNUTLS_KX_PSK,
  51 };
  52 
  53 const int anon_tls_kx_order[] = {
  54     GNUTLS_KX_ANON_DH,
  55     GNUTLS_KX_DHE_RSA,
  56     GNUTLS_KX_DHE_DSS,
  57     GNUTLS_KX_RSA,
  58     0
  59 };
  60 #endif
  61 
  62 /* Swab macros from linux/swab.h */
  63 #ifdef HAVE_LINUX_SWAB_H
  64 #  include <linux/swab.h>
  65 #else
  66 /*
  67  * casts are necessary for constants, because we never know how for sure
  68  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
  69  */
  70 #define __swab16(x) ((uint16_t)(                                      \
  71         (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) |                  \
  72         (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
  73 
  74 #define __swab32(x) ((uint32_t)(                                      \
  75         (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) |            \
  76         (((uint32_t)(x) & (uint32_t)0x0000ff00UL) <<  8) |            \
  77         (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >>  8) |            \
  78         (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
  79 
  80 #define __swab64(x) ((uint64_t)(                                      \
  81         (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) |   \
  82         (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) |   \
  83         (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) |   \
  84         (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) <<  8) |   \
  85         (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >>  8) |   \
  86         (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) |   \
  87         (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) |   \
  88         (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
  89 #endif
  90 
  91 #define REMOTE_MSG_VERSION 1
  92 #define ENDIAN_LOCAL 0xBADADBBD
  93 
  94 struct crm_remote_header_v0 
  95 {
  96     uint32_t endian;    /* Detect messages from hosts with different endian-ness */
  97     uint32_t version;
  98     uint64_t id;
  99     uint64_t flags;
 100     uint32_t size_total;
 101     uint32_t payload_offset;
 102     uint32_t payload_compressed;
 103     uint32_t payload_uncompressed;
 104 
 105         /* New fields get added here */
 106 
 107 } __attribute__ ((packed));
 108 
 109 static struct crm_remote_header_v0 *
 110 crm_remote_header(crm_remote_t * remote)
     /* [previous][next][first][last][top][bottom][index][help] */
 111 {
 112     struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
 113     if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
 114         return NULL;
 115 
 116     } else if(header->endian != ENDIAN_LOCAL) {
 117         uint32_t endian = __swab32(header->endian);
 118 
 119         CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
 120         if(endian != ENDIAN_LOCAL) {
 121             crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
 122                     ENDIAN_LOCAL, header->endian, endian);
 123             return NULL;
 124         }
 125 
 126         header->id = __swab64(header->id);
 127         header->flags = __swab64(header->flags);
 128         header->endian = __swab32(header->endian);
 129 
 130         header->version = __swab32(header->version);
 131         header->size_total = __swab32(header->size_total);
 132         header->payload_offset = __swab32(header->payload_offset);
 133         header->payload_compressed = __swab32(header->payload_compressed);
 134         header->payload_uncompressed = __swab32(header->payload_uncompressed);
 135     }
 136 
 137     return header;
 138 }
 139 
 140 #ifdef HAVE_GNUTLS_GNUTLS_H
 141 
 142 int
 143 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
     /* [previous][next][first][last][top][bottom][index][help] */
 144 {
 145     int rc = 0;
 146     int pollrc = 0;
 147     time_t start = time(NULL);
 148 
 149     do {
 150         rc = gnutls_handshake(*remote->tls_session);
 151         if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
 152             pollrc = crm_remote_ready(remote, 1000);
 153             if (pollrc < 0) {
 154                 /* poll returned error, there is no hope */
 155                 rc = -1;
 156             }
 157         }
 158 
 159     } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
 160              (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
 161 
 162     if (rc < 0) {
 163         crm_trace("gnutls_handshake() failed with %d", rc);
 164     }
 165     return rc;
 166 }
 167 
 168 void *
 169 crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ,
     /* [previous][next][first][last][top][bottom][index][help] */
 170                             void *credentials)
 171 {
 172     gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
 173 
 174     gnutls_init(session, type);
 175 #  ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
 176 /*      http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication */
 177     gnutls_priority_set_direct(*session, "NORMAL:+ANON-DH", NULL);
 178 /*      gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
 179 #  else
 180     gnutls_set_default_priority(*session);
 181     gnutls_kx_set_priority(*session, anon_tls_kx_order);
 182 #  endif
 183     gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
 184     switch (type) {
 185         case GNUTLS_SERVER:
 186             gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
 187                                    (gnutls_anon_server_credentials_t) credentials);
 188             break;
 189         case GNUTLS_CLIENT:
 190             gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
 191                                    (gnutls_anon_client_credentials_t) credentials);
 192             break;
 193     }
 194 
 195     return session;
 196 }
 197 
 198 void *
 199 create_psk_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ , void *credentials)
     /* [previous][next][first][last][top][bottom][index][help] */
 200 {
 201     gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
 202 
 203     gnutls_init(session, type);
 204 #  ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
 205     gnutls_priority_set_direct(*session, "NORMAL:+DHE-PSK:+PSK", NULL);
 206 #  else
 207     gnutls_set_default_priority(*session);
 208     gnutls_kx_set_priority(*session, psk_tls_kx_order);
 209 #  endif
 210     gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
 211     switch (type) {
 212         case GNUTLS_SERVER:
 213             gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
 214                                    (gnutls_psk_server_credentials_t) credentials);
 215             break;
 216         case GNUTLS_CLIENT:
 217             gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
 218                                    (gnutls_psk_client_credentials_t) credentials);
 219             break;
 220     }
 221 
 222     return session;
 223 }
 224 
 225 static int
 226 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
     /* [previous][next][first][last][top][bottom][index][help] */
 227 {
 228     const char *unsent = buf;
 229     int rc = 0;
 230     int total_send;
 231 
 232     if (buf == NULL) {
 233         return -EINVAL;
 234     }
 235 
 236     total_send = len;
 237     crm_trace("Message size: %llu", (unsigned long long) len);
 238 
 239     while (TRUE) {
 240         rc = gnutls_record_send(*session, unsent, len);
 241 
 242         if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
 243             crm_trace("Retrying to send %llu bytes",
 244                       (unsigned long long) len);
 245 
 246         } else if (rc < 0) {
 247             crm_err("Connection terminated: %s " CRM_XS " rc=%d",
 248                     gnutls_strerror(rc), rc);
 249             rc = -ECONNABORTED;
 250             break;
 251 
 252         } else if (rc < len) {
 253             crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
 254             len -= rc;
 255             unsent += rc;
 256         } else {
 257             crm_trace("Sent all %d bytes", rc);
 258             break;
 259         }
 260     }
 261 
 262     return rc < 0 ? rc : total_send;
 263 }
 264 #endif
 265 
 266 static int
 267 crm_send_plaintext(int sock, const char *buf, size_t len)
     /* [previous][next][first][last][top][bottom][index][help] */
 268 {
 269 
 270     int rc = 0;
 271     const char *unsent = buf;
 272     int total_send;
 273 
 274     if (buf == NULL) {
 275         return -EINVAL;
 276     }
 277     total_send = len;
 278 
 279     crm_trace("Message on socket %d: size=%llu",
 280               sock, (unsigned long long) len);
 281   retry:
 282     rc = write(sock, unsent, len);
 283     if (rc < 0) {
 284         rc = -errno;
 285         switch (errno) {
 286             case EINTR:
 287             case EAGAIN:
 288                 crm_trace("Retry");
 289                 goto retry;
 290             default:
 291                 crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
 292                 break;
 293         }
 294 
 295     } else if (rc < len) {
 296         crm_trace("Only sent %d of %llu remaining bytes",
 297                   rc, (unsigned long long) len);
 298         len -= rc;
 299         unsent += rc;
 300         goto retry;
 301 
 302     } else {
 303         crm_trace("Sent %d bytes: %.100s", rc, buf);
 304     }
 305 
 306     return rc < 0 ? rc : total_send;
 307 
 308 }
 309 
 310 static int
 311 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
     /* [previous][next][first][last][top][bottom][index][help] */
 312 {
 313     int lpc = 0;
 314     int rc = -ESOCKTNOSUPPORT;
 315 
 316     for(; lpc < iovs; lpc++) {
 317 
 318 #ifdef HAVE_GNUTLS_GNUTLS_H
 319         if (remote->tls_session) {
 320             rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
 321         } else if (remote->tcp_socket) {
 322 #else
 323         if (remote->tcp_socket) {
 324 #endif
 325             rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
 326 
 327         } else {
 328             crm_err("Unsupported connection type");
 329         }
 330     }
 331     return rc;
 332 }
 333 
 334 int
 335 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
     /* [previous][next][first][last][top][bottom][index][help] */
 336 {
 337     int rc = pcmk_ok;
 338     static uint64_t id = 0;
 339     char *xml_text = dump_xml_unformatted(msg);
 340 
 341     struct iovec iov[2];
 342     struct crm_remote_header_v0 *header;
 343 
 344     if (xml_text == NULL) {
 345         crm_err("Could not send remote message: no message provided");
 346         return -EINVAL;
 347     }
 348 
 349     header = calloc(1, sizeof(struct crm_remote_header_v0));
 350     iov[0].iov_base = header;
 351     iov[0].iov_len = sizeof(struct crm_remote_header_v0);
 352 
 353     iov[1].iov_base = xml_text;
 354     iov[1].iov_len = 1 + strlen(xml_text);
 355 
 356     id++;
 357     header->id = id;
 358     header->endian = ENDIAN_LOCAL;
 359     header->version = REMOTE_MSG_VERSION;
 360     header->payload_offset = iov[0].iov_len;
 361     header->payload_uncompressed = iov[1].iov_len;
 362     header->size_total = iov[0].iov_len + iov[1].iov_len;
 363 
 364     crm_trace("Sending len[0]=%d, start=%x",
 365               (int)iov[0].iov_len, *(int*)(void*)xml_text);
 366     rc = crm_remote_sendv(remote, iov, 2);
 367     if (rc < 0) {
 368         crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
 369                 pcmk_strerror(rc), rc);
 370     }
 371 
 372     free(iov[0].iov_base);
 373     free(iov[1].iov_base);
 374     return rc;
 375 }
 376 
 377 
 378 /*!
 379  * \internal
 380  * \brief handles the recv buffer and parsing out msgs.
 381  * \note new_data is owned by this function once it is passed in.
 382  */
 383 xmlNode *
 384 crm_remote_parse_buffer(crm_remote_t * remote)
     /* [previous][next][first][last][top][bottom][index][help] */
 385 {
 386     xmlNode *xml = NULL;
 387     struct crm_remote_header_v0 *header = crm_remote_header(remote);
 388 
 389     if (remote->buffer == NULL || header == NULL) {
 390         return NULL;
 391     }
 392 
 393     /* Support compression on the receiving end now, in case we ever want to add it later */
 394     if (header->payload_compressed) {
 395         int rc = 0;
 396         unsigned int size_u = 1 + header->payload_uncompressed;
 397         char *uncompressed = calloc(1, header->payload_offset + size_u);
 398 
 399         crm_trace("Decompressing message data %d bytes into %d bytes",
 400                  header->payload_compressed, size_u);
 401 
 402         rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
 403                                         remote->buffer + header->payload_offset,
 404                                         header->payload_compressed, 1, 0);
 405 
 406         if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
 407             crm_warn("Couldn't decompress v%d message, we only understand v%d",
 408                      header->version, REMOTE_MSG_VERSION);
 409             free(uncompressed);
 410             return NULL;
 411 
 412         } else if (rc != BZ_OK) {
 413             crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
 414             free(uncompressed);
 415             return NULL;
 416         }
 417 
 418         CRM_ASSERT(size_u == header->payload_uncompressed);
 419 
 420         memcpy(uncompressed, remote->buffer, header->payload_offset);       /* Preserve the header */
 421         remote->buffer_size = header->payload_offset + size_u;
 422 
 423         free(remote->buffer);
 424         remote->buffer = uncompressed;
 425         header = crm_remote_header(remote);
 426     }
 427 
 428     /* take ownership of the buffer */
 429     remote->buffer_offset = 0;
 430 
 431     CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
 432 
 433     xml = string2xml(remote->buffer + header->payload_offset);
 434     if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
 435         crm_warn("Couldn't parse v%d message, we only understand v%d",
 436                  header->version, REMOTE_MSG_VERSION);
 437 
 438     } else if (xml == NULL) {
 439         crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
 440     }
 441 
 442     return xml;
 443 }
 444 
 445 /*!
 446  * \internal
 447  * \brief Wait for a remote session to have data to read
 448  *
 449  * \param[in] remote         Connection to check
 450  * \param[in] total_timeout  Maximum time (in ms) to wait
 451  *
 452  * \return Positive value if ready to be read, 0 on timeout, -errno on error
 453  */
 454 int
 455 crm_remote_ready(crm_remote_t *remote, int total_timeout)
     /* [previous][next][first][last][top][bottom][index][help] */
 456 {
 457     struct pollfd fds = { 0, };
 458     int sock = 0;
 459     int rc = 0;
 460     time_t start;
 461     int timeout = total_timeout;
 462 
 463 #ifdef HAVE_GNUTLS_GNUTLS_H
 464     if (remote->tls_session) {
 465         void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
 466 
 467         sock = GPOINTER_TO_INT(sock_ptr);
 468     } else if (remote->tcp_socket) {
 469 #else
 470     if (remote->tcp_socket) {
 471 #endif
 472         sock = remote->tcp_socket;
 473     } else {
 474         crm_err("Unsupported connection type");
 475     }
 476 
 477     if (sock <= 0) {
 478         crm_trace("No longer connected");
 479         return -ENOTCONN;
 480     }
 481 
 482     start = time(NULL);
 483     errno = 0;
 484     do {
 485         fds.fd = sock;
 486         fds.events = POLLIN;
 487 
 488         /* If we got an EINTR while polling, and we have a
 489          * specific timeout we are trying to honor, attempt
 490          * to adjust the timeout to the closest second. */
 491         if (errno == EINTR && (timeout > 0)) {
 492             timeout = total_timeout - ((time(NULL) - start) * 1000);
 493             if (timeout < 1000) {
 494                 timeout = 1000;
 495             }
 496         }
 497 
 498         rc = poll(&fds, 1, timeout);
 499     } while (rc < 0 && errno == EINTR);
 500 
 501     return (rc < 0)? -errno : rc;
 502 }
 503 
 504 
 505 /*!
 506  * \internal
 507  * \brief Read bytes off non blocking remote connection.
 508  *
 509  * \note only use with NON-Blocking sockets. Should only be used after polling socket.
 510  *       This function will return once max_size is met, the socket read buffer
 511  *       is empty, or an error is encountered.
 512  *
 513  * \retval number of bytes received
 514  */
 515 static size_t
 516 crm_remote_recv_once(crm_remote_t * remote)
     /* [previous][next][first][last][top][bottom][index][help] */
 517 {
 518     int rc = 0;
 519     size_t read_len = sizeof(struct crm_remote_header_v0);
 520     struct crm_remote_header_v0 *header = crm_remote_header(remote);
 521 
 522     if(header) {
 523         /* Stop at the end of the current message */
 524         read_len = header->size_total;
 525     }
 526 
 527     /* automatically grow the buffer when needed */
 528     if(remote->buffer_size < read_len) {
 529            remote->buffer_size = 2 * read_len;
 530         crm_trace("Expanding buffer to %llu bytes",
 531                   (unsigned long long) remote->buffer_size);
 532 
 533         remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
 534         CRM_ASSERT(remote->buffer != NULL);
 535     }
 536 
 537 #ifdef HAVE_GNUTLS_GNUTLS_H
 538     if (remote->tls_session) {
 539         rc = gnutls_record_recv(*(remote->tls_session),
 540                                 remote->buffer + remote->buffer_offset,
 541                                 remote->buffer_size - remote->buffer_offset);
 542         if (rc == GNUTLS_E_INTERRUPTED) {
 543             rc = -EINTR;
 544         } else if (rc == GNUTLS_E_AGAIN) {
 545             rc = -EAGAIN;
 546         } else if (rc < 0) {
 547             crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
 548             rc = -pcmk_err_generic;
 549         }
 550     } else if (remote->tcp_socket) {
 551 #else
 552     if (remote->tcp_socket) {
 553 #endif
 554         errno = 0;
 555         rc = read(remote->tcp_socket,
 556                   remote->buffer + remote->buffer_offset,
 557                   remote->buffer_size - remote->buffer_offset);
 558         if(rc < 0) {
 559             rc = -errno;
 560         }
 561 
 562     } else {
 563         crm_err("Unsupported connection type");
 564         return -ESOCKTNOSUPPORT;
 565     }
 566 
 567     /* process any errors. */
 568     if (rc > 0) {
 569         remote->buffer_offset += rc;
 570         /* always null terminate buffer, the +1 to alloc always allows for this. */
 571         remote->buffer[remote->buffer_offset] = '\0';
 572         crm_trace("Received %u more bytes, %llu total",
 573                   rc, (unsigned long long) remote->buffer_offset);
 574 
 575     } else if (rc == -EINTR || rc == -EAGAIN) {
 576         crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
 577 
 578     } else if (rc == 0) {
 579         crm_debug("EOF encoutered after %llu bytes",
 580                   (unsigned long long) remote->buffer_offset);
 581         return -ENOTCONN;
 582 
 583     } else {
 584         crm_debug("Error receiving message after %llu bytes: %s (%d)",
 585                   (unsigned long long) remote->buffer_offset,
 586                   pcmk_strerror(rc), rc);
 587         return -ENOTCONN;
 588     }
 589 
 590     header = crm_remote_header(remote);
 591     if(header) {
 592         if(remote->buffer_offset < header->size_total) {
 593             crm_trace("Read less than the advertised length: %llu < %u bytes",
 594                       (unsigned long long) remote->buffer_offset,
 595                       header->size_total);
 596         } else {
 597             crm_trace("Read full message of %llu bytes",
 598                       (unsigned long long) remote->buffer_offset);
 599             return remote->buffer_offset;
 600         }
 601     }
 602 
 603     return -EAGAIN;
 604 }
 605 
 606 /*!
 607  * \internal
 608  * \brief Read message(s) from a remote connection
 609  *
 610  * \param[in]  remote         Remote connection to read
 611  * \param[in]  total_timeout  Fail if message not read in this time (ms)
 612  * \param[out] disconnected   Will be set to 1 if disconnect detected
 613  *
 614  * \return TRUE if at least one full message read, FALSE otherwise
 615  */
 616 gboolean
 617 crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
     /* [previous][next][first][last][top][bottom][index][help] */
 618 {
 619     int rc;
 620     time_t start = time(NULL);
 621     int remaining_timeout = 0;
 622 
 623     if (total_timeout == 0) {
 624         total_timeout = 10000;
 625     } else if (total_timeout < 0) {
 626         total_timeout = 60000;
 627     }
 628     *disconnected = 0;
 629 
 630     remaining_timeout = total_timeout;
 631     while ((remaining_timeout > 0) && !(*disconnected)) {
 632 
 633         crm_trace("Waiting for remote data (%d of %d ms timeout remaining)",
 634                   remaining_timeout, total_timeout);
 635         rc = crm_remote_ready(remote, remaining_timeout);
 636 
 637         if (rc == 0) {
 638             crm_err("Timed out (%d ms) while waiting for remote data",
 639                     remaining_timeout);
 640             return FALSE;
 641 
 642         } else if (rc < 0) {
 643             crm_debug("Wait for remote data aborted, will try again: %s "
 644                       CRM_XS " rc=%d", pcmk_strerror(rc), rc);
 645 
 646         } else {
 647             rc = crm_remote_recv_once(remote);
 648             if (rc > 0) {
 649                 return TRUE;
 650             } else if (rc == -EAGAIN) {
 651                 crm_trace("Still waiting for remote data");
 652             } else if (rc < 0) {
 653                 crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
 654                           pcmk_strerror(rc), rc);
 655             }
 656         }
 657 
 658         if (rc == -ENOTCONN) {
 659             *disconnected = 1;
 660             return FALSE;
 661         }
 662 
 663         remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
 664     }
 665 
 666     return FALSE;
 667 }
 668 
 669 struct tcp_async_cb_data {
 670     gboolean success;
 671     int sock;
 672     void *userdata;
 673     void (*callback) (void *userdata, int sock);
 674     int timeout;                /*ms */
 675     time_t start;
 676 };
 677 
 678 static gboolean
 679 check_connect_finished(gpointer userdata)
     /* [previous][next][first][last][top][bottom][index][help] */
 680 {
 681     struct tcp_async_cb_data *cb_data = userdata;
 682     int cb_arg = 0; // socket fd on success, -errno on error
 683     int sock = cb_data->sock;
 684     int error = 0;
 685 
 686     fd_set rset, wset;
 687     socklen_t len = sizeof(error);
 688     struct timeval ts = { 0, };
 689 
 690     if (cb_data->success == TRUE) {
 691         goto dispatch_done;
 692     }
 693 
 694     FD_ZERO(&rset);
 695     FD_SET(sock, &rset);
 696     wset = rset;
 697 
 698     crm_trace("fd %d: checking to see if connect finished", sock);
 699     cb_arg = select(sock + 1, &rset, &wset, NULL, &ts);
 700 
 701     if (cb_arg < 0) {
 702         cb_arg = -errno;
 703         if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
 704             /* reschedule if there is still time left */
 705             if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
 706                 goto reschedule;
 707             } else {
 708                 cb_arg = -ETIMEDOUT;
 709             }
 710         }
 711         crm_trace("fd %d: select failed %d connect dispatch ", sock, cb_arg);
 712         goto dispatch_done;
 713     } else if (cb_arg == 0) {
 714         if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
 715             goto reschedule;
 716         }
 717         crm_debug("fd %d: timeout during select", sock);
 718         cb_arg = -ETIMEDOUT;
 719         goto dispatch_done;
 720     } else {
 721         crm_trace("fd %d: select returned success", sock);
 722         cb_arg = 0;
 723     }
 724 
 725     /* can we read or write to the socket now? */
 726     if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
 727         if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
 728             cb_arg = -errno;
 729             crm_trace("fd %d: call to getsockopt failed", sock);
 730             goto dispatch_done;
 731         }
 732         if (error) {
 733             crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
 734             cb_arg = -error;
 735             goto dispatch_done;
 736         }
 737     } else {
 738         crm_trace("neither read nor write set after select");
 739         cb_arg = -EAGAIN;
 740         goto dispatch_done;
 741     }
 742 
 743   dispatch_done:
 744     if (!cb_arg) {
 745         crm_trace("fd %d: connected", sock);
 746         /* Success, set the return code to the sock to report to the callback */
 747         cb_arg = cb_data->sock;
 748         cb_data->sock = 0;
 749     } else {
 750         close(sock);
 751     }
 752 
 753     if (cb_data->callback) {
 754         cb_data->callback(cb_data->userdata, cb_arg);
 755     }
 756     free(cb_data);
 757     return FALSE;
 758 
 759   reschedule:
 760 
 761     /* will check again next interval */
 762     return TRUE;
 763 }
 764 
 765 static int
 766 internal_tcp_connect_async(int sock,
     /* [previous][next][first][last][top][bottom][index][help] */
 767                            const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
 768                            int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
 769 {
 770     int rc = 0;
 771     int interval = 500;
 772     int timer;
 773     struct tcp_async_cb_data *cb_data = NULL;
 774 
 775     rc = crm_set_nonblocking(sock);
 776     if (rc < 0) {
 777         crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
 778                  pcmk_strerror(rc), rc);
 779         close(sock);
 780         return -1;
 781     }
 782 
 783     rc = connect(sock, addr, addrlen);
 784     if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
 785         crm_perror(LOG_WARNING, "connect");
 786         return -1;
 787     }
 788 
 789     cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
 790     cb_data->userdata = userdata;
 791     cb_data->callback = callback;
 792     cb_data->sock = sock;
 793     cb_data->timeout = timeout;
 794     cb_data->start = time(NULL);
 795 
 796     if (rc == 0) {
 797         /* The connect was successful immediately, we still return to mainloop
 798          * and let this callback get called later. This avoids the user of this api
 799          * to have to account for the fact the callback could be invoked within this
 800          * function before returning. */
 801         cb_data->success = TRUE;
 802         interval = 1;
 803     }
 804 
 805     /* Check connect finished is mostly doing a non-block poll on the socket
 806      * to see if we can read/write to it. Once we can, the connect has completed.
 807      * This method allows us to connect to the server without blocking mainloop.
 808      *
 809      * This is a poor man's way of polling to see when the connection finished.
 810      * At some point we should figure out a way to use a mainloop fd callback for this.
 811      * Something about the way mainloop is currently polling prevents this from working at the
 812      * moment though. */
 813     crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
 814               interval, sock);
 815     timer = g_timeout_add(interval, check_connect_finished, cb_data);
 816     if (timer_id) {
 817         *timer_id = timer;
 818     }
 819 
 820     return 0;
 821 }
 822 
 823 static int
 824 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
     /* [previous][next][first][last][top][bottom][index][help] */
 825 {
 826     int rc = connect(sock, addr, addrlen);
 827 
 828     if (rc < 0) {
 829         rc = -errno;
 830         crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
 831                  pcmk_strerror(rc), rc);
 832         return rc;
 833     }
 834 
 835     rc = crm_set_nonblocking(sock);
 836     if (rc < 0) {
 837         crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
 838                  pcmk_strerror(rc), rc);
 839         return rc;
 840     }
 841 
 842     return pcmk_ok;
 843 }
 844 
 845 /*!
 846  * \internal
 847  * \brief Connect to server at specified TCP port
 848  *
 849  * \param[in]  host      Name of server to connect to
 850  * \param[in]  port      Server port to connect to
 851  * \param[in]  timeout   Report error if not connected in this many milliseconds
 852  * \param[out] timer_id  If non-NULL, will be set to timer ID, if asynchronous
 853  * \param[in]  userdata  Data to pass to callback, if asynchronous
 854  * \param[in]  callback  If non-NULL, connect asynchronously then call this
 855  *
 856  * \return File descriptor of connected socket on success, -ENOTCONN otherwise
 857  */
 858 int
 859 crm_remote_tcp_connect_async(const char *host, int port, int timeout,
     /* [previous][next][first][last][top][bottom][index][help] */
 860                              int *timer_id, void *userdata,
 861                              void (*callback) (void *userdata, int sock))
 862 {
 863     char buffer[INET6_ADDRSTRLEN];
 864     struct addrinfo *res = NULL;
 865     struct addrinfo *rp = NULL;
 866     struct addrinfo hints;
 867     const char *server = host;
 868     int ret_ga;
 869     int sock = -ENOTCONN;
 870 
 871     // Get host's IP address(es)
 872     memset(&hints, 0, sizeof(struct addrinfo));
 873     hints.ai_family = AF_UNSPEC;        /* Allow IPv4 or IPv6 */
 874     hints.ai_socktype = SOCK_STREAM;
 875     hints.ai_flags = AI_CANONNAME;
 876     ret_ga = getaddrinfo(server, NULL, &hints, &res);
 877     if (ret_ga) {
 878         crm_err("Unable to get IP address info for %s: %s",
 879                 server, gai_strerror(ret_ga));
 880         goto async_cleanup;
 881     }
 882     if (!res || !res->ai_addr) {
 883         crm_err("Unable to get IP address info for %s: no result", server);
 884         goto async_cleanup;
 885     }
 886 
 887     // getaddrinfo() returns a list of host's addresses, try them in order
 888     for (rp = res; rp != NULL; rp = rp->ai_next) {
 889         struct sockaddr *addr = rp->ai_addr;
 890 
 891         if (!addr) {
 892             continue;
 893         }
 894 
 895         if (rp->ai_canonname) {
 896             server = res->ai_canonname;
 897         }
 898         crm_debug("Got canonical name %s for %s", server, host);
 899 
 900         sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
 901         if (sock == -1) {
 902             crm_perror(LOG_WARNING, "creating socket for connection to %s",
 903                        server);
 904             sock = -ENOTCONN;
 905             continue;
 906         }
 907 
 908         /* Set port appropriately for address family */
 909         /* (void*) casts avoid false-positive compiler alignment warnings */
 910         if (addr->sa_family == AF_INET6) {
 911             ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
 912         } else {
 913             ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
 914         }
 915 
 916         memset(buffer, 0, DIMOF(buffer));
 917         crm_sockaddr2str(addr, buffer);
 918         crm_info("Attempting TCP connection to %s:%d", buffer, port);
 919 
 920         if (callback) {
 921             if (internal_tcp_connect_async
 922                 (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
 923                 goto async_cleanup; /* Success for now, we'll hear back later in the callback */
 924             }
 925 
 926         } else if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
 927             break;          /* Success */
 928         }
 929 
 930         close(sock);
 931         sock = -ENOTCONN;
 932     }
 933 
 934 async_cleanup:
 935 
 936     if (res) {
 937         freeaddrinfo(res);
 938     }
 939     return sock;
 940 }
 941 
 942 int
 943 crm_remote_tcp_connect(const char *host, int port)
     /* [previous][next][first][last][top][bottom][index][help] */
 944 {
 945     return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
 946 }
 947 
 948 /*!
 949  * \brief Convert an IP address (IPv4 or IPv6) to a string for logging
 950  *
 951  * \param[in]  sa  Socket address for IP
 952  * \param[out] s   Storage for at least INET6_ADDRSTRLEN bytes
 953  *
 954  * \note sa The socket address can be a pointer to struct sockaddr_in (IPv4),
 955  *          struct sockaddr_in6 (IPv6) or struct sockaddr_storage (either),
 956  *          as long as its sa_family member is set correctly.
 957  */
 958 void
 959 crm_sockaddr2str(void *sa, char *s)
     /* [previous][next][first][last][top][bottom][index][help] */
 960 {
 961     switch (((struct sockaddr*)sa)->sa_family) {
 962         case AF_INET:
 963             inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
 964                       s, INET6_ADDRSTRLEN);
 965             break;
 966 
 967         case AF_INET6:
 968             inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
 969                       s, INET6_ADDRSTRLEN);
 970             break;
 971 
 972         default:
 973             strcpy(s, "<invalid>");
 974     }
 975 }
 976 
 977 int
 978 crm_remote_accept(int ssock)
     /* [previous][next][first][last][top][bottom][index][help] */
 979 {
 980     int csock = 0;
 981     int rc = 0;
 982     unsigned laddr = 0;
 983     struct sockaddr_storage addr;
 984     char addr_str[INET6_ADDRSTRLEN];
 985 #ifdef TCP_USER_TIMEOUT
 986     int optval;
 987     long sbd_timeout = crm_get_sbd_timeout();
 988 #endif
 989 
 990     /* accept the connection */
 991     laddr = sizeof(addr);
 992     memset(&addr, 0, sizeof(addr));
 993     csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
 994     crm_sockaddr2str(&addr, addr_str);
 995     crm_info("New remote connection from %s", addr_str);
 996 
 997     if (csock == -1) {
 998         crm_err("accept socket failed");
 999         return -1;
1000     }
1001 
1002     rc = crm_set_nonblocking(csock);
1003     if (rc < 0) {
1004         crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1005                 pcmk_strerror(rc), rc);
1006         close(csock);
1007         return rc;
1008     }
1009 
1010 #ifdef TCP_USER_TIMEOUT
1011     if (sbd_timeout > 0) {
1012         optval = sbd_timeout / 2; /* time to fail and retry before watchdog */
1013         rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
1014                         &optval, sizeof(optval));
1015         if (rc < 0) {
1016             crm_err("setting TCP_USER_TIMEOUT (%d) on client socket failed",
1017                     optval);
1018             close(csock);
1019             return rc;
1020         }
1021     }
1022 #endif
1023 
1024     return csock;
1025 }
1026 
1027 /*!
1028  * \brief Get the default remote connection TCP port on this host
1029  *
1030  * \return Remote connection TCP port number
1031  */
1032 int
1033 crm_default_remote_port()
     /* [previous][next][first][last][top][bottom][index][help] */
1034 {
1035     static int port = 0;
1036 
1037     if (port == 0) {
1038         const char *env = getenv("PCMK_remote_port");
1039 
1040         if (env) {
1041             errno = 0;
1042             port = strtol(env, NULL, 10);
1043             if (errno || (port < 1) || (port > 65535)) {
1044                 crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1045                          env, DEFAULT_REMOTE_PORT);
1046                 port = DEFAULT_REMOTE_PORT;
1047             }
1048         } else {
1049             port = DEFAULT_REMOTE_PORT;
1050         }
1051     }
1052     return port;
1053 }

/* [previous][next][first][last][top][bottom][index][help] */