18#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/ip.h>
22#include <netinet/tcp.h>
37#include <gnutls/gnutls.h>
40#ifdef HAVE_LINUX_SWAB_H
41# include <linux/swab.h>
47#define __swab16(x) ((uint16_t)( \
48 (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
49 (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51#define __swab32(x) ((uint32_t)( \
52 (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
53 (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
54 (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
55 (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57#define __swab64(x) ((uint64_t)( \
58 (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
59 (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
60 (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
61 (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
62 (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
63 (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
64 (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
65 (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
68#define REMOTE_MSG_VERSION 1
69#define ENDIAN_LOCAL 0xBADADBBD
71struct remote_header_v0 {
77 uint32_t payload_offset;
78 uint32_t payload_compressed;
79 uint32_t payload_uncompressed;
96static struct remote_header_v0 *
99 struct remote_header_v0 *header = (
struct remote_header_v0 *)remote->
buffer;
100 if(remote->buffer_offset <
sizeof(
struct remote_header_v0)) {
108 crm_err(
"Invalid message detected, endian mismatch: %" PRIx32
109 " is neither %" PRIx32
" nor the swab'd %" PRIx32,
115 header->flags =
__swab64(header->flags);
116 header->endian =
__swab32(header->endian);
118 header->version =
__swab32(header->version);
119 header->size_total =
__swab32(header->size_total);
120 header->payload_offset =
__swab32(header->payload_offset);
121 header->payload_compressed =
__swab32(header->payload_compressed);
122 header->payload_uncompressed =
__swab32(header->payload_uncompressed);
130send_tls(gnutls_session_t session,
struct iovec *iov)
132 const char *unsent = iov->iov_base;
133 size_t unsent_len = iov->iov_len;
136 if (unsent == NULL) {
140 crm_trace(
"Sending TLS message of %zu bytes", unsent_len);
143 gnutls_rc = gnutls_record_send(session, unsent, unsent_len);
145 if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
146 crm_trace(
"Retrying to send %zu bytes remaining", unsent_len);
148 }
else if (gnutls_rc < 0) {
150 crm_info(
"TLS connection terminated: %s " QB_XS
" rc=%zd",
151 gnutls_strerror((
int) gnutls_rc), gnutls_rc);
154 }
else if (gnutls_rc < unsent_len) {
155 crm_trace(
"Sent %zd of %zu bytes remaining", gnutls_rc, unsent_len);
156 unsent_len -= gnutls_rc;
159 crm_trace(
"Sent all %zd bytes remaining", gnutls_rc);
168send_plaintext(
int sock,
struct iovec *iov)
170 const char *unsent = iov->iov_base;
171 size_t unsent_len = iov->iov_len;
173 if (unsent == NULL) {
177 crm_debug(
"Sending plaintext message of %zu bytes to socket %d",
180 ssize_t write_rc = write(sock, unsent, unsent_len);
185 if ((rc == EINTR) || (rc == EAGAIN) || (rc == EWOULDBLOCK)) {
186 crm_trace(
"Retrying to send %zu bytes remaining to socket %d",
192 crm_info(
"Could not send message: %s " QB_XS
" rc=%d socket=%d",
196 }
else if (write_rc < unsent_len) {
197 crm_trace(
"Sent %zd of %zu bytes remaining", write_rc, unsent_len);
199 unsent_len -= write_rc;
202 crm_trace(
"Sent all %zd bytes remaining: %.100s",
203 write_rc, (
char *) (iov->iov_base));
211remote_send_iovs(
pcmk__remote_t *remote,
struct iovec *iov,
int iovs)
215 for (
int lpc = 0; (lpc < iovs) && (rc ==
pcmk_rc_ok); lpc++) {
221 rc = send_plaintext(remote->
tcp_socket, &(iov[lpc]));
223 rc = ESOCKTNOSUPPORT;
242 static uint64_t
id = 0;
243 GString *xml_text = NULL;
246 struct remote_header_v0 *header;
248 CRM_CHECK((remote != NULL) && (msg != NULL),
return EINVAL);
250 xml_text = g_string_sized_new(1024);
253 g_string_free(xml_text, TRUE);
return EINVAL);
257 iov[0].iov_base = header;
258 iov[0].iov_len =
sizeof(
struct remote_header_v0);
260 iov[1].iov_len = 1 + xml_text->len;
261 iov[1].iov_base = g_string_free(xml_text, FALSE);
267 header->payload_offset = iov[0].iov_len;
268 header->payload_uncompressed = iov[1].iov_len;
269 header->size_total = iov[0].iov_len + iov[1].iov_len;
271 rc = remote_send_iovs(remote, iov, 2);
273 crm_err(
"Could not send remote message: %s " QB_XS
" rc=%d",
277 free(iov[0].iov_base);
278 g_free((gchar *) iov[1].iov_base);
295 struct remote_header_v0 *header = localized_remote_header(remote);
297 if (header == NULL) {
302 if (header->payload_compressed) {
304 unsigned int size_u = 1 + header->payload_uncompressed;
308 crm_trace(
"Decompressing message data %d bytes into %d bytes",
309 header->payload_compressed, size_u);
311 rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
312 remote->
buffer + header->payload_offset,
313 header->payload_compressed, 1, 0);
317 crm_warn(
"Couldn't decompress v%d message, we only understand v%d",
323 crm_err(
"Decompression failed: %s " QB_XS
" rc=%d",
331 memcpy(uncompressed, remote->
buffer, header->payload_offset);
332 remote->
buffer_size = header->payload_offset + size_u;
335 remote->
buffer = uncompressed;
336 header = localized_remote_header(remote);
342 CRM_LOG_ASSERT(remote->
buffer[
sizeof(
struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
346 crm_warn(
"Couldn't parse v%d message, we only understand v%d",
349 }
else if (xml == NULL) {
350 crm_err(
"Couldn't parse: '%.120s'", remote->
buffer + header->payload_offset);
366 crm_err(
"Remote connection type undetermined (bug?)");
384 struct pollfd fds = { 0, };
390 sock = get_remote_socket(remote);
405 if (errno == EINTR && (
timeout > 0)) {
406 timeout = timeout_ms - ((time(NULL) - start) * 1000);
413 }
while (rc < 0 && errno == EINTR);
437 size_t read_len =
sizeof(
struct remote_header_v0);
438 struct remote_header_v0 *header = localized_remote_header(remote);
443 read_len = header->size_total;
457 if (read_rc == GNUTLS_E_INTERRUPTED) {
459 }
else if (read_rc == GNUTLS_E_AGAIN) {
461 }
else if (read_rc < 0) {
462 crm_debug(
"TLS receive failed: %s (%zd)",
463 gnutls_strerror((
int) read_rc), read_rc);
474 crm_err(
"Remote connection type undetermined (bug?)");
475 return ESOCKTNOSUPPORT;
483 crm_trace(
"Received %zd more bytes (%zu total)",
486 }
else if (read_rc == 0) {
487 crm_debug(
"End of remote data encountered after %zu bytes",
491 }
else if ((rc == EINTR) || (rc == EAGAIN) || (rc == EWOULDBLOCK)) {
492 crm_trace(
"No data available for non-blocking remote read: %s (%d)",
496 crm_debug(
"Error receiving remote data after %zu bytes: %s (%d)",
501 header = localized_remote_header(remote);
504 crm_trace(
"Read partial remote message (%zu of %" PRIu32
" bytes)",
507 crm_trace(
"Read full remote message of %zu bytes",
530 time_t start = time(NULL);
531 int remaining_timeout = 0;
533 if (timeout_ms == 0) {
535 }
else if (timeout_ms < 0) {
539 remaining_timeout = timeout_ms;
540 while (remaining_timeout > 0) {
542 crm_trace(
"Waiting for remote data (%d ms of %d ms timeout remaining)",
543 remaining_timeout, timeout_ms);
547 crm_err(
"Timed out (%d ms) while waiting for remote data",
552 crm_debug(
"Wait for remote data aborted (will retry): %s "
559 }
else if (rc == EAGAIN) {
560 crm_trace(
"Waiting for more remote data");
562 crm_debug(
"Could not receive remote data: %s " QB_XS
" rc=%d",
568 if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
572 remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
577struct tcp_async_cb_data {
582 void (*callback) (
void *userdata,
int rc,
int sock);
587check_connect_finished(gpointer userdata)
589 struct tcp_async_cb_data *cb_data = userdata;
593 struct timeval ts = { 0, };
595 if (cb_data->start == 0) {
603 FD_SET(cb_data->sock, &rset);
605 rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
609 if ((rc == EINTR) || (rc == EAGAIN)) {
616 crm_trace(
"Could not check socket %d for connection success: %s (%d)",
619 }
else if (rc == 0) {
623 crm_debug(
"Timed out while waiting for socket %d connection success",
629 }
else if (FD_ISSET(cb_data->sock, &rset)
630 || FD_ISSET(cb_data->sock, &wset)) {
634 socklen_t len =
sizeof(error);
636 if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
638 crm_trace(
"Couldn't check socket %d for connection errors: %s (%d)",
640 }
else if (error != 0) {
642 crm_trace(
"Socket %d connected with error: %s (%d)",
649 crm_trace(
"select() succeeded, but socket %d not in resulting "
650 "read/write sets", cb_data->sock);
656 crm_trace(
"Socket %d is connected", cb_data->sock);
658 close(cb_data->sock);
662 if (cb_data->callback) {
663 cb_data->callback(cb_data->userdata, rc, cb_data->sock);
688connect_socket_retry(
int sock,
const struct sockaddr *addr, socklen_t addrlen,
689 int timeout_ms,
int *timer_id,
void *userdata,
690 void (*callback) (
void *userdata,
int rc,
int sock))
695 struct tcp_async_cb_data *cb_data = NULL;
699 crm_warn(
"Could not set socket non-blocking: %s " QB_XS
" rc=%d",
704 rc = connect(sock, addr, addrlen);
714 crm_warn(
"Could not connect socket: %s " QB_XS
" rc=%d",
721 cb_data->userdata = userdata;
722 cb_data->callback = callback;
723 cb_data->sock = sock;
724 cb_data->timeout_ms = timeout_ms;
734 cb_data->start = time(NULL);
746 crm_trace(
"Scheduling check in %dms for whether connect to fd %d finished",
767connect_socket_once(
int sock,
const struct sockaddr *addr, socklen_t addrlen)
769 int rc = connect(sock, addr, addrlen);
773 crm_warn(
"Could not connect socket: %s " QB_XS
" rc=%d",
780 crm_warn(
"Could not set socket non-blocking: %s " QB_XS
" rc=%d",
806 int *sock_fd,
void *userdata,
807 void (*callback) (
void *userdata,
int rc,
int sock))
809 char buffer[INET6_ADDRSTRLEN];
810 struct addrinfo *res = NULL;
811 struct addrinfo *rp = NULL;
812 struct addrinfo hints;
813 const char *server =
host;
817 CRM_CHECK((
host != NULL) && (sock_fd != NULL),
return EINVAL);
820 memset(&hints, 0,
sizeof(
struct addrinfo));
821 hints.ai_family = AF_UNSPEC;
822 hints.ai_socktype = SOCK_STREAM;
823 hints.ai_flags = AI_CANONNAME;
825 rc = getaddrinfo(server, NULL, &hints, &res);
829 crm_err(
"Unable to get IP address info for %s: %s",
834 if (!res || !res->ai_addr) {
835 crm_err(
"Unable to get IP address info for %s: no result", server);
841 for (rp = res; rp != NULL; rp = rp->ai_next) {
842 struct sockaddr *addr = rp->ai_addr;
848 if (rp->ai_canonname) {
849 server = res->ai_canonname;
853 sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
856 crm_warn(
"Could not create socket for remote connection to %s:%d: "
857 "%s " QB_XS
" rc=%d", server, port,
pcmk_rc_str(rc), rc);
863 if (addr->sa_family == AF_INET6) {
864 ((
struct sockaddr_in6 *)(
void*)addr)->sin6_port = htons(port);
866 ((
struct sockaddr_in *)(
void*)addr)->sin_port = htons(port);
871 crm_info(
"Attempting remote connection to %s:%d", buffer, port);
874 if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen,
timeout,
875 timer_id, userdata, callback) ==
pcmk_rc_ok) {
879 }
else if (connect_socket_once(sock, rp->ai_addr,
913 switch (((
const struct sockaddr *) sa)->sa_family) {
915 inet_ntop(AF_INET, &(((
const struct sockaddr_in *) sa)->sin_addr),
916 s, INET6_ADDRSTRLEN);
921 &(((
const struct sockaddr_in6 *) sa)->sin6_addr),
922 s, INET6_ADDRSTRLEN);
926 strcpy(s,
"<invalid>");
943 struct sockaddr_storage addr;
944 socklen_t laddr =
sizeof(addr);
945 char addr_str[INET6_ADDRSTRLEN];
946#ifdef TCP_USER_TIMEOUT
947 long sbd_timeout = 0;
951 memset(&addr, 0,
sizeof(addr));
952 *csock = accept(ssock, (
struct sockaddr *)&addr, &laddr);
955 crm_err(
"Could not accept remote client connection: %s "
960 crm_info(
"Accepted new remote client connection from %s", addr_str);
964 crm_err(
"Could not set socket non-blocking: %s " QB_XS
" rc=%d",
971#ifdef TCP_USER_TIMEOUT
973 if (sbd_timeout > 0) {
975 long half = sbd_timeout / 2;
976 unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
978 rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
979 &optval,
sizeof(optval));
982 crm_err(
"Could not set TCP timeout to %d ms on remote connection: "
983 "%s " QB_XS
" rc=%d", optval,
pcmk_rc_str(rc), rc);
1002 static int port = 0;
1009 port = strtol(env, NULL, 10);
1010 if (errno || (port < 1) || (port > 65535)) {
1012 " has invalid value '%s', using %d instead",
guint pcmk__timeout_ms2s(guint timeout_ms)
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
#define pcmk__assert_alloc(nmemb, size)
struct tcp_async_cb_data __attribute__
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
#define REMOTE_MSG_VERSION
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
int pcmk__read_available_remote_data(pcmk__remote_t *remote)
int pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
int pcmk__accept_remote_connection(int ssock, int *csock)
void pcmk__sockaddr2str(const void *sa, char *s)
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
int pcmk__set_nonblocking(int fd)
#define crm_info(fmt, args...)
#define crm_warn(fmt, args...)
#define CRM_LOG_ASSERT(expr)
#define CRM_CHECK(expr, failure_action)
#define crm_debug(fmt, args...)
#define crm_err(fmt, args...)
#define crm_log_xml_trace(xml, text)
#define crm_trace(fmt, args...)
#define DEFAULT_REMOTE_PORT
Wrappers for and extensions to glib mainloop.
#define PCMK__ENV_REMOTE_PORT
long pcmk__get_sbd_watchdog_timeout(void)
const char * pcmk__env_option(const char *option)
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
#define pcmk__assert(expr)
int pcmk__gaierror2rc(int gai)
Map a getaddrinfo() return code to the most similar Pacemaker return code.
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
gnutls_session_t tls_session
int pcmk__tls_get_client_sock(const pcmk__remote_t *remote)
Wrappers for and extensions to libxml2.
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
xmlNode * pcmk__xml_parse(const char *input)