13 #include <sys/param.h> 15 #include <sys/types.h> 18 #include <sys/socket.h> 19 #include <arpa/inet.h> 20 #include <netinet/in.h> 21 #include <netinet/ip.h> 22 #include <netinet/tcp.h> 36 #ifdef HAVE_GNUTLS_GNUTLS_H 37 # include <gnutls/gnutls.h> 41 #ifdef HAVE_LINUX_SWAB_H 42 # include <linux/swab.h> 48 #define __swab16(x) ((uint16_t)( \ 49 (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \ 50 (((uint16_t)(x) & (uint16_t)0xff00U) >> 8))) 52 #define __swab32(x) ((uint32_t)( \ 53 (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \ 54 (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \ 55 (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \ 56 (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24))) 58 #define __swab64(x) ((uint64_t)( \ 59 (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \ 60 (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \ 61 (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \ 62 (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \ 63 (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \ 64 (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \ 65 (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \ 66 (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56))) 69 #define REMOTE_MSG_VERSION 1 70 #define ENDIAN_LOCAL 0xBADADBBD 72 struct remote_header_v0 {
97 static struct remote_header_v0 *
100 struct remote_header_v0 *header = (
struct remote_header_v0 *)remote->
buffer;
101 if(remote->
buffer_offset <
sizeof(
struct remote_header_v0)) {
109 crm_err(
"Invalid message detected, endian mismatch: %" PRIx32
110 " is neither %" PRIx32
" nor the swab'd %" PRIx32,
116 header->flags =
__swab64(header->flags);
117 header->endian =
__swab32(header->endian);
119 header->version =
__swab32(header->version);
120 header->size_total =
__swab32(header->size_total);
121 header->payload_offset =
__swab32(header->payload_offset);
122 header->payload_compressed =
__swab32(header->payload_compressed);
123 header->payload_uncompressed =
__swab32(header->payload_uncompressed);
129 #ifdef HAVE_GNUTLS_GNUTLS_H 132 pcmk__tls_client_handshake(
pcmk__remote_t *remote,
int timeout_ms)
136 time_t time_limit = time(NULL) + timeout_ms / 1000;
139 rc = gnutls_handshake(*remote->tls_session);
140 if ((rc == GNUTLS_E_INTERRUPTED) || (rc == GNUTLS_E_AGAIN)) {
144 crm_trace(
"TLS handshake poll failed: %s (%d)",
149 crm_trace(
"TLS handshake failed: %s (%d)",
150 gnutls_strerror(rc), rc);
155 }
while (time(NULL) < time_limit);
166 set_minimum_dh_bits(
const gnutls_session_t *session)
177 if (dh_min_bits > 0) {
178 crm_info(
"Requiring server use a Diffie-Hellman prime of at least %d bits",
180 gnutls_dh_set_prime_bits(*session, dh_min_bits);
185 get_bound_dh_bits(
unsigned int dh_bits)
195 if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) {
196 crm_warn(
"Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits");
199 if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
202 if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
220 pcmk__new_tls_session(
int csock,
unsigned int conn_type,
221 gnutls_credentials_type_t cred_type,
void *credentials)
223 int rc = GNUTLS_E_SUCCESS;
224 const char *prio_base = NULL;
226 gnutls_session_t *session = NULL;
236 if (prio_base == NULL) {
240 (cred_type == GNUTLS_CRD_ANON)?
"+ANON-DH" :
"+DHE-PSK:+PSK");
242 session = gnutls_malloc(
sizeof(gnutls_session_t));
243 if (session == NULL) {
244 rc = GNUTLS_E_MEMORY_ERROR;
248 rc = gnutls_init(session, conn_type);
249 if (rc != GNUTLS_E_SUCCESS) {
257 rc = gnutls_priority_set_direct(*session, prio, NULL);
258 if (rc != GNUTLS_E_SUCCESS) {
261 if (conn_type == GNUTLS_CLIENT) {
262 set_minimum_dh_bits(session);
265 gnutls_transport_set_ptr(*session,
266 (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
268 rc = gnutls_credentials_set(*session, cred_type, credentials);
269 if (rc != GNUTLS_E_SUCCESS) {
276 crm_err(
"Could not initialize %s TLS %s session: %s " 277 CRM_XS " rc=%d priority='%s'",
278 (cred_type == GNUTLS_CRD_ANON)?
"anonymous" :
"PSK",
279 (conn_type == GNUTLS_SERVER)?
"server" :
"client",
280 gnutls_strerror(rc), rc, prio);
282 if (session != NULL) {
283 gnutls_free(session);
304 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
306 int rc = GNUTLS_E_SUCCESS;
307 unsigned int dh_bits = 0;
309 rc = gnutls_dh_params_init(dh_params);
310 if (rc != GNUTLS_E_SUCCESS) {
314 dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
315 GNUTLS_SEC_PARAM_NORMAL);
317 rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
320 dh_bits = get_bound_dh_bits(dh_bits);
322 crm_info(
"Generating Diffie-Hellman parameters with %u-bit prime for TLS",
324 rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
325 if (rc != GNUTLS_E_SUCCESS) {
332 crm_err(
"Could not initialize Diffie-Hellman parameters for TLS: %s " 333 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
356 rc = gnutls_handshake(*client->
remote->tls_session);
357 }
while (rc == GNUTLS_E_INTERRUPTED);
359 if (rc == GNUTLS_E_AGAIN) {
364 }
else if (rc != GNUTLS_E_SUCCESS) {
365 crm_err(
"TLS handshake with remote client failed: %s " 366 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
374 send_tls(gnutls_session_t *session,
struct iovec *iov)
376 const char *unsent = iov->iov_base;
377 size_t unsent_len = iov->iov_len;
380 if (unsent == NULL) {
384 crm_trace(
"Sending TLS message of %llu bytes",
385 (
unsigned long long) unsent_len);
387 gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
389 if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
390 crm_trace(
"Retrying to send %llu bytes remaining",
391 (
unsigned long long) unsent_len);
393 }
else if (gnutls_rc < 0) {
396 gnutls_strerror((
int) gnutls_rc),
397 (
long long) gnutls_rc);
400 }
else if (gnutls_rc < unsent_len) {
401 crm_trace(
"Sent %lld of %llu bytes remaining",
402 (
long long) gnutls_rc, (
unsigned long long) unsent_len);
403 unsent_len -= gnutls_rc;
406 crm_trace(
"Sent all %lld bytes remaining", (
long long) gnutls_rc);
416 send_plaintext(
int sock,
struct iovec *iov)
418 const char *unsent = iov->iov_base;
419 size_t unsent_len = iov->iov_len;
422 if (unsent == NULL) {
426 crm_debug(
"Sending plaintext message of %llu bytes to socket %d",
427 (
unsigned long long) unsent_len, sock);
429 write_rc = write(sock, unsent, unsent_len);
433 if ((errno == EINTR) || (errno == EAGAIN)) {
434 crm_trace(
"Retrying to send %llu bytes remaining to socket %d",
435 (
unsigned long long) unsent_len, sock);
444 }
else if (write_rc < unsent_len) {
445 crm_trace(
"Sent %lld of %llu bytes remaining",
446 (
long long) write_rc, (
unsigned long long) unsent_len);
448 unsent_len -= write_rc;
452 crm_trace(
"Sent all %lld bytes remaining: %.100s",
453 (
long long) write_rc, (
char *) (iov->iov_base));
462 remote_send_iovs(
pcmk__remote_t *remote,
struct iovec *iov,
int iovs)
466 for (
int lpc = 0; (lpc < iovs) && (rc ==
pcmk_rc_ok); lpc++) {
467 #ifdef HAVE_GNUTLS_GNUTLS_H 468 if (remote->tls_session) {
469 rc = send_tls(remote->tls_session, &(iov[lpc]));
474 rc = send_plaintext(remote->
tcp_socket, &(iov[lpc]));
476 rc = ESOCKTNOSUPPORT;
495 static uint64_t
id = 0;
496 char *xml_text = NULL;
499 struct remote_header_v0 *header;
501 CRM_CHECK((remote != NULL) && (msg != NULL),
return EINVAL);
504 CRM_CHECK(xml_text != NULL,
return EINVAL);
506 header = calloc(1,
sizeof(
struct remote_header_v0));
509 iov[0].iov_base = header;
510 iov[0].iov_len =
sizeof(
struct remote_header_v0);
512 iov[1].iov_base = xml_text;
513 iov[1].iov_len = 1 + strlen(xml_text);
519 header->payload_offset = iov[0].iov_len;
520 header->payload_uncompressed = iov[1].iov_len;
521 header->size_total = iov[0].iov_len + iov[1].iov_len;
523 rc = remote_send_iovs(remote, iov, 2);
525 crm_err(
"Could not send remote message: %s " CRM_XS " rc=%d",
529 free(iov[0].iov_base);
530 free(iov[1].iov_base);
547 struct remote_header_v0 *header = localized_remote_header(remote);
549 if (header == NULL) {
554 if (header->payload_compressed) {
556 unsigned int size_u = 1 + header->payload_uncompressed;
557 char *uncompressed = calloc(1, header->payload_offset + size_u);
559 crm_trace(
"Decompressing message data %d bytes into %d bytes",
560 header->payload_compressed, size_u);
562 rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
563 remote->
buffer + header->payload_offset,
564 header->payload_compressed, 1, 0);
568 crm_warn(
"Couldn't decompress v%d message, we only understand v%d",
580 CRM_ASSERT(size_u == header->payload_uncompressed);
582 memcpy(uncompressed, remote->
buffer, header->payload_offset);
583 remote->
buffer_size = header->payload_offset + size_u;
586 remote->
buffer = uncompressed;
587 header = localized_remote_header(remote);
593 CRM_LOG_ASSERT(remote->
buffer[
sizeof(
struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
597 crm_warn(
"Couldn't parse v%d message, we only understand v%d",
600 }
else if (xml == NULL) {
601 crm_err(
"Couldn't parse: '%.120s'", remote->
buffer + header->payload_offset);
610 #ifdef HAVE_GNUTLS_GNUTLS_H 611 if (remote->tls_session) {
612 void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
614 return GPOINTER_TO_INT(sock_ptr);
622 crm_err(
"Remote connection type undetermined (bug?)");
640 struct pollfd fds = { 0, };
646 sock = get_remote_socket(remote);
661 if (errno == EINTR && (
timeout > 0)) {
662 timeout = timeout_ms - ((time(NULL) - start) * 1000);
669 }
while (rc < 0 && errno == EINTR);
693 size_t read_len =
sizeof(
struct remote_header_v0);
694 struct remote_header_v0 *header = localized_remote_header(remote);
695 bool received =
false;
700 read_len = header->size_total;
706 crm_trace(
"Expanding buffer to %llu bytes",
711 #ifdef HAVE_GNUTLS_GNUTLS_H 712 if (!received && remote->tls_session) {
713 read_rc = gnutls_record_recv(*(remote->tls_session),
716 if (read_rc == GNUTLS_E_INTERRUPTED) {
718 }
else if (read_rc == GNUTLS_E_AGAIN) {
720 }
else if (read_rc < 0) {
721 crm_debug(
"TLS receive failed: %s (%lld)",
722 gnutls_strerror(read_rc), (
long long) read_rc);
740 crm_err(
"Remote connection type undetermined (bug?)");
741 return ESOCKTNOSUPPORT;
749 crm_trace(
"Received %lld more bytes (%llu total)",
753 }
else if ((rc == EINTR) || (rc == EAGAIN)) {
754 crm_trace(
"No data available for non-blocking remote read: %s (%d)",
757 }
else if (read_rc == 0) {
758 crm_debug(
"End of remote data encountered after %llu bytes",
763 crm_debug(
"Error receiving remote data after %llu bytes: %s (%d)",
769 header = localized_remote_header(remote);
772 crm_trace(
"Read partial remote message (%llu of %u bytes)",
776 crm_trace(
"Read full remote message of %llu bytes",
799 time_t start = time(NULL);
800 int remaining_timeout = 0;
802 if (timeout_ms == 0) {
804 }
else if (timeout_ms < 0) {
808 remaining_timeout = timeout_ms;
809 while (remaining_timeout > 0) {
811 crm_trace(
"Waiting for remote data (%d ms of %d ms timeout remaining)",
812 remaining_timeout, timeout_ms);
816 crm_err(
"Timed out (%d ms) while waiting for remote data",
821 crm_debug(
"Wait for remote data aborted (will retry): %s " 825 rc = read_available_remote_data(remote);
828 }
else if (rc == EAGAIN) {
829 crm_trace(
"Waiting for more remote data");
837 if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
841 remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
846 struct tcp_async_cb_data {
851 void (*callback) (
void *userdata,
int rc,
int sock);
856 check_connect_finished(gpointer userdata)
858 struct tcp_async_cb_data *cb_data = userdata;
862 struct timeval ts = { 0, };
864 if (cb_data->start == 0) {
872 FD_SET(cb_data->sock, &rset);
874 rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
878 if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
879 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
885 crm_trace(
"Could not check socket %d for connection success: %s (%d)",
888 }
else if (rc == 0) {
889 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
892 crm_debug(
"Timed out while waiting for socket %d connection success",
898 }
else if (FD_ISSET(cb_data->sock, &rset)
899 || FD_ISSET(cb_data->sock, &wset)) {
903 socklen_t len =
sizeof(error);
905 if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
907 crm_trace(
"Couldn't check socket %d for connection errors: %s (%d)",
909 }
else if (error != 0) {
911 crm_trace(
"Socket %d connected with error: %s (%d)",
918 crm_trace(
"select() succeeded, but socket %d not in resulting " 919 "read/write sets", cb_data->sock);
925 crm_trace(
"Socket %d is connected", cb_data->sock);
927 close(cb_data->sock);
931 if (cb_data->callback) {
932 cb_data->callback(cb_data->userdata, rc, cb_data->sock);
957 connect_socket_retry(
int sock,
const struct sockaddr *addr, socklen_t addrlen,
958 int timeout_ms,
int *timer_id,
void *userdata,
959 void (*callback) (
void *userdata,
int rc,
int sock))
964 struct tcp_async_cb_data *cb_data = NULL;
968 crm_warn(
"Could not set socket non-blocking: %s " CRM_XS " rc=%d",
973 rc = connect(sock, addr, addrlen);
974 if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
981 cb_data = calloc(1,
sizeof(
struct tcp_async_cb_data));
982 cb_data->userdata = userdata;
983 cb_data->callback = callback;
984 cb_data->sock = sock;
985 cb_data->timeout_ms = timeout_ms;
995 cb_data->start = time(NULL);
1007 crm_trace(
"Scheduling check in %dms for whether connect to fd %d finished",
1009 timer = g_timeout_add(interval, check_connect_finished, cb_data);
1030 connect_socket_once(
int sock,
const struct sockaddr *addr, socklen_t addrlen)
1032 int rc = connect(sock, addr, addrlen);
1043 crm_warn(
"Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1069 int *sock_fd,
void *userdata,
1070 void (*callback) (
void *userdata,
int rc,
int sock))
1072 char buffer[INET6_ADDRSTRLEN];
1073 struct addrinfo *res = NULL;
1074 struct addrinfo *rp = NULL;
1075 struct addrinfo hints;
1076 const char *server =
host;
1080 CRM_CHECK((
host != NULL) && (sock_fd != NULL),
return EINVAL);
1083 memset(&hints, 0,
sizeof(
struct addrinfo));
1084 hints.ai_family = AF_UNSPEC;
1085 hints.ai_socktype = SOCK_STREAM;
1086 hints.ai_flags = AI_CANONNAME;
1088 rc = getaddrinfo(server, NULL, &hints, &res);
1092 crm_err(
"Unable to get IP address info for %s: %s",
1097 if (!res || !res->ai_addr) {
1098 crm_err(
"Unable to get IP address info for %s: no result", server);
1104 for (rp = res; rp != NULL; rp = rp->ai_next) {
1105 struct sockaddr *addr = rp->ai_addr;
1111 if (rp->ai_canonname) {
1112 server = res->ai_canonname;
1116 sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1119 crm_warn(
"Could not create socket for remote connection to %s:%d: " 1126 if (addr->sa_family == AF_INET6) {
1127 ((
struct sockaddr_in6 *)(
void*)addr)->sin6_port = htons(port);
1129 ((
struct sockaddr_in *)(
void*)addr)->sin_port = htons(port);
1134 crm_info(
"Attempting remote connection to %s:%d", buffer, port);
1137 if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen,
timeout,
1138 timer_id, userdata, callback) ==
pcmk_rc_ok) {
1142 }
else if (connect_socket_once(sock, rp->ai_addr,
1176 switch (((
const struct sockaddr *) sa)->sa_family) {
1178 inet_ntop(AF_INET, &(((
const struct sockaddr_in *) sa)->sin_addr),
1179 s, INET6_ADDRSTRLEN);
1184 &(((
const struct sockaddr_in6 *) sa)->sin6_addr),
1185 s, INET6_ADDRSTRLEN);
1189 strcpy(s,
"<invalid>");
1206 struct sockaddr_storage addr;
1207 socklen_t laddr =
sizeof(addr);
1208 char addr_str[INET6_ADDRSTRLEN];
1211 memset(&addr, 0,
sizeof(addr));
1212 *csock = accept(ssock, (
struct sockaddr *)&addr, &laddr);
1215 crm_err(
"Could not accept remote client connection: %s " 1220 crm_info(
"Accepted new remote client connection from %s", addr_str);
1224 crm_err(
"Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1231 #ifdef TCP_USER_TIMEOUT 1236 rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1237 &optval,
sizeof(optval));
1240 crm_err(
"Could not set TCP timeout to %d ms on remote connection: " 1260 static int port = 0;
1267 port = strtol(env, NULL, 10);
1268 if (errno || (port < 1) || (port > 65535)) {
1270 " has invalid value '%s', using %d instead",
#define CRM_CHECK(expr, failure_action)
int pcmk__set_nonblocking(int fd)
const char * pcmk_strerror(int rc)
#define PCMK__ENV_DH_MIN_BITS
uint32_t payload_compressed
int pcmk__scan_min_int(const char *text, int *result, int minimum)
#define PCMK__ENV_REMOTE_PORT
uint32_t payload_uncompressed
#define CRM_LOG_ASSERT(expr)
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
const char * pcmk__env_option(const char *option)
Wrappers for and extensions to glib mainloop.
xmlNode * string2xml(const char *input)
void pcmk__sockaddr2str(const void *sa, char *s)
#define DEFAULT_REMOTE_PORT
#define PCMK_GNUTLS_PRIORITIES
#define PCMK__ENV_TLS_PRIORITIES
#define crm_warn(fmt, args...)
#define crm_debug(fmt, args...)
char * dump_xml_unformatted(const xmlNode *xml)
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
#define crm_trace(fmt, args...)
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
Wrappers for and extensions to libxml2.
int pcmk_legacy2rc(int legacy_rc)
struct tcp_async_cb_data __attribute__
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
#define REMOTE_MSG_VERSION
int pcmk__gaierror2rc(int gai)
Map a getaddrinfo() return code to the most similar Pacemaker return code.
#define crm_err(fmt, args...)
int pcmk__accept_remote_connection(int ssock, int *csock)
#define PCMK__ENV_DH_MAX_BITS
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
struct pcmk__remote_s * remote
#define crm_info(fmt, args...)
long pcmk__get_sbd_timeout(void)
int pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)