pacemaker 3.0.1-16e74fc4da
Scalable High-Availability cluster resource manager
Loading...
Searching...
No Matches
remote.c
Go to the documentation of this file.
1/*
2 * Copyright 2008-2025 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10#include <crm_internal.h>
11#include <crm/crm.h>
12
13#include <sys/param.h>
14#include <stdio.h>
15#include <sys/types.h>
16#include <sys/stat.h>
17#include <unistd.h>
18#include <sys/socket.h>
19#include <arpa/inet.h>
20#include <netinet/in.h>
21#include <netinet/ip.h>
22#include <netinet/tcp.h>
23#include <netdb.h>
24#include <stdlib.h>
25#include <errno.h>
26#include <inttypes.h> // PRIx32
27
28#include <glib.h>
29#include <bzlib.h>
30
32#include <crm/common/xml.h>
33#include <crm/common/mainloop.h>
36
37#include <gnutls/gnutls.h>
38
39/* Swab macros from linux/swab.h */
40#ifdef HAVE_LINUX_SWAB_H
41# include <linux/swab.h>
42#else
43/*
44 * casts are necessary for constants, because we never know how for sure
45 * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
46 */
47#define __swab16(x) ((uint16_t)( \
48 (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
49 (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
50
51#define __swab32(x) ((uint32_t)( \
52 (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
53 (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
54 (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
55 (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
56
57#define __swab64(x) ((uint64_t)( \
58 (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
59 (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
60 (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
61 (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
62 (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
63 (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
64 (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
65 (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
66#endif
67
68#define REMOTE_MSG_VERSION 1
69#define ENDIAN_LOCAL 0xBADADBBD
70
71struct remote_header_v0 {
72 uint32_t endian; /* Detect messages from hosts with different endian-ness */
73 uint32_t version;
74 uint64_t id;
75 uint64_t flags;
76 uint32_t size_total;
77 uint32_t payload_offset;
78 uint32_t payload_compressed;
79 uint32_t payload_uncompressed;
80
81 /* New fields get added here */
82
83} __attribute__ ((packed));
84
96static struct remote_header_v0 *
97localized_remote_header(pcmk__remote_t *remote)
98{
99 struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
100 if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
101 return NULL;
102
103 } else if(header->endian != ENDIAN_LOCAL) {
104 uint32_t endian = __swab32(header->endian);
105
107 if(endian != ENDIAN_LOCAL) {
108 crm_err("Invalid message detected, endian mismatch: %" PRIx32
109 " is neither %" PRIx32 " nor the swab'd %" PRIx32,
110 ENDIAN_LOCAL, header->endian, endian);
111 return NULL;
112 }
113
114 header->id = __swab64(header->id);
115 header->flags = __swab64(header->flags);
116 header->endian = __swab32(header->endian);
117
118 header->version = __swab32(header->version);
119 header->size_total = __swab32(header->size_total);
120 header->payload_offset = __swab32(header->payload_offset);
121 header->payload_compressed = __swab32(header->payload_compressed);
122 header->payload_uncompressed = __swab32(header->payload_uncompressed);
123 }
124
125 return header;
126}
127
128// \return Standard Pacemaker return code
129static int
130send_tls(gnutls_session_t session, struct iovec *iov)
131{
132 const char *unsent = iov->iov_base;
133 size_t unsent_len = iov->iov_len;
134 ssize_t gnutls_rc;
135
136 if (unsent == NULL) {
137 return EINVAL;
138 }
139
140 crm_trace("Sending TLS message of %zu bytes", unsent_len);
141
142 while (true) {
143 gnutls_rc = gnutls_record_send(session, unsent, unsent_len);
144
145 if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
146 crm_trace("Retrying to send %zu bytes remaining", unsent_len);
147
148 } else if (gnutls_rc < 0) {
149 // Caller can log as error if necessary
150 crm_info("TLS connection terminated: %s " QB_XS " rc=%zd",
151 gnutls_strerror((int) gnutls_rc), gnutls_rc);
152 return ECONNABORTED;
153
154 } else if (gnutls_rc < unsent_len) {
155 crm_trace("Sent %zd of %zu bytes remaining", gnutls_rc, unsent_len);
156 unsent_len -= gnutls_rc;
157 unsent += gnutls_rc;
158 } else {
159 crm_trace("Sent all %zd bytes remaining", gnutls_rc);
160 break;
161 }
162 }
163 return pcmk_rc_ok;
164}
165
166// \return Standard Pacemaker return code
167static int
168send_plaintext(int sock, struct iovec *iov)
169{
170 const char *unsent = iov->iov_base;
171 size_t unsent_len = iov->iov_len;
172
173 if (unsent == NULL) {
174 return EINVAL;
175 }
176
177 crm_debug("Sending plaintext message of %zu bytes to socket %d",
178 unsent_len, sock);
179 while (true) {
180 ssize_t write_rc = write(sock, unsent, unsent_len);
181
182 if (write_rc < 0) {
183 int rc = errno;
184
185 if ((rc == EINTR) || (rc == EAGAIN) || (rc == EWOULDBLOCK)) {
186 crm_trace("Retrying to send %zu bytes remaining to socket %d",
187 unsent_len, sock);
188 continue;
189 }
190
191 // Caller can log as error if necessary
192 crm_info("Could not send message: %s " QB_XS " rc=%d socket=%d",
193 pcmk_rc_str(rc), rc, sock);
194 return rc;
195
196 } else if (write_rc < unsent_len) {
197 crm_trace("Sent %zd of %zu bytes remaining", write_rc, unsent_len);
198 unsent += write_rc;
199 unsent_len -= write_rc;
200
201 } else {
202 crm_trace("Sent all %zd bytes remaining: %.100s",
203 write_rc, (char *) (iov->iov_base));
204 return pcmk_rc_ok;
205 }
206 }
207}
208
209// \return Standard Pacemaker return code
210static int
211remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
212{
213 int rc = pcmk_rc_ok;
214
215 for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
216 if (remote->tls_session) {
217 rc = send_tls(remote->tls_session, &(iov[lpc]));
218 continue;
219 }
220 if (remote->tcp_socket >= 0) {
221 rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
222 } else {
223 rc = ESOCKTNOSUPPORT;
224 }
225 }
226 return rc;
227}
228
238int
239pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
240{
241 int rc = pcmk_rc_ok;
242 static uint64_t id = 0;
243 GString *xml_text = NULL;
244
245 struct iovec iov[2];
246 struct remote_header_v0 *header;
247
248 CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
249
250 xml_text = g_string_sized_new(1024);
251 pcmk__xml_string(msg, 0, xml_text, 0);
252 CRM_CHECK(xml_text->len > 0,
253 g_string_free(xml_text, TRUE); return EINVAL);
254
255 header = pcmk__assert_alloc(1, sizeof(struct remote_header_v0));
256
257 iov[0].iov_base = header;
258 iov[0].iov_len = sizeof(struct remote_header_v0);
259
260 iov[1].iov_len = 1 + xml_text->len;
261 iov[1].iov_base = g_string_free(xml_text, FALSE);
262
263 id++;
264 header->id = id;
265 header->endian = ENDIAN_LOCAL;
266 header->version = REMOTE_MSG_VERSION;
267 header->payload_offset = iov[0].iov_len;
268 header->payload_uncompressed = iov[1].iov_len;
269 header->size_total = iov[0].iov_len + iov[1].iov_len;
270
271 rc = remote_send_iovs(remote, iov, 2);
272 if (rc != pcmk_rc_ok) {
273 crm_err("Could not send remote message: %s " QB_XS " rc=%d",
274 pcmk_rc_str(rc), rc);
275 }
276
277 free(iov[0].iov_base);
278 g_free((gchar *) iov[1].iov_base);
279 return rc;
280}
281
291xmlNode *
293{
294 xmlNode *xml = NULL;
295 struct remote_header_v0 *header = localized_remote_header(remote);
296
297 if (header == NULL) {
298 return NULL;
299 }
300
301 /* Support compression on the receiving end now, in case we ever want to add it later */
302 if (header->payload_compressed) {
303 int rc = 0;
304 unsigned int size_u = 1 + header->payload_uncompressed;
305 char *uncompressed =
306 pcmk__assert_alloc(1, header->payload_offset + size_u);
307
308 crm_trace("Decompressing message data %d bytes into %d bytes",
309 header->payload_compressed, size_u);
310
311 rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
312 remote->buffer + header->payload_offset,
313 header->payload_compressed, 1, 0);
314 rc = pcmk__bzlib2rc(rc);
315
316 if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
317 crm_warn("Couldn't decompress v%d message, we only understand v%d",
318 header->version, REMOTE_MSG_VERSION);
319 free(uncompressed);
320 return NULL;
321
322 } else if (rc != pcmk_rc_ok) {
323 crm_err("Decompression failed: %s " QB_XS " rc=%d",
324 pcmk_rc_str(rc), rc);
325 free(uncompressed);
326 return NULL;
327 }
328
329 pcmk__assert(size_u == header->payload_uncompressed);
330
331 memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
332 remote->buffer_size = header->payload_offset + size_u;
333
334 free(remote->buffer);
335 remote->buffer = uncompressed;
336 header = localized_remote_header(remote);
337 }
338
339 /* take ownership of the buffer */
340 remote->buffer_offset = 0;
341
342 CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
343
344 xml = pcmk__xml_parse(remote->buffer + header->payload_offset);
345 if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
346 crm_warn("Couldn't parse v%d message, we only understand v%d",
347 header->version, REMOTE_MSG_VERSION);
348
349 } else if (xml == NULL) {
350 crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
351 }
352
353 crm_log_xml_trace(xml, "[remote msg]");
354 return xml;
355}
356
357static int
358get_remote_socket(const pcmk__remote_t *remote)
359{
360 if (remote->tls_session != NULL) {
361 return pcmk__tls_get_client_sock(remote);
362 }
363 if (remote->tcp_socket >= 0) {
364 return remote->tcp_socket;
365 }
366 crm_err("Remote connection type undetermined (bug?)");
367 return -1;
368}
369
381int
382pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
383{
384 struct pollfd fds = { 0, };
385 int sock = -1;
386 int rc = 0;
387 time_t start;
388 int timeout = timeout_ms;
389
390 sock = get_remote_socket(remote);
391 if (sock < 0) {
392 crm_trace("No longer connected");
393 return ENOTCONN;
394 }
395
396 start = time(NULL);
397 errno = 0;
398 do {
399 fds.fd = sock;
400 fds.events = POLLIN;
401
402 /* If we got an EINTR while polling, and we have a
403 * specific timeout we are trying to honor, attempt
404 * to adjust the timeout to the closest second. */
405 if (errno == EINTR && (timeout > 0)) {
406 timeout = timeout_ms - ((time(NULL) - start) * 1000);
407 if (timeout < 1000) {
408 timeout = 1000;
409 }
410 }
411
412 rc = poll(&fds, 1, timeout);
413 } while (rc < 0 && errno == EINTR);
414
415 if (rc < 0) {
416 return errno;
417 }
418 return (rc == 0)? ETIME : pcmk_rc_ok;
419}
420
433int
435{
436 int rc = pcmk_rc_ok;
437 size_t read_len = sizeof(struct remote_header_v0);
438 struct remote_header_v0 *header = localized_remote_header(remote);
439 ssize_t read_rc;
440
441 if(header) {
442 /* Stop at the end of the current message */
443 read_len = header->size_total;
444 }
445
446 /* automatically grow the buffer when needed */
447 if(remote->buffer_size < read_len) {
448 remote->buffer_size = 2 * read_len;
449 crm_trace("Expanding buffer to %zu bytes", remote->buffer_size);
450 remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
451 }
452
453 if (remote->tls_session) {
454 read_rc = gnutls_record_recv(remote->tls_session,
455 remote->buffer + remote->buffer_offset,
456 remote->buffer_size - remote->buffer_offset);
457 if (read_rc == GNUTLS_E_INTERRUPTED) {
458 rc = EINTR;
459 } else if (read_rc == GNUTLS_E_AGAIN) {
460 rc = EAGAIN;
461 } else if (read_rc < 0) {
462 crm_debug("TLS receive failed: %s (%zd)",
463 gnutls_strerror((int) read_rc), read_rc);
464 rc = EIO;
465 }
466 } else if (remote->tcp_socket >= 0) {
467 read_rc = read(remote->tcp_socket,
468 remote->buffer + remote->buffer_offset,
469 remote->buffer_size - remote->buffer_offset);
470 if (read_rc < 0) {
471 rc = errno;
472 }
473 } else {
474 crm_err("Remote connection type undetermined (bug?)");
475 return ESOCKTNOSUPPORT;
476 }
477
478 /* process any errors. */
479 if (read_rc > 0) {
480 remote->buffer_offset += read_rc;
481 /* always null terminate buffer, the +1 to alloc always allows for this. */
482 remote->buffer[remote->buffer_offset] = '\0';
483 crm_trace("Received %zd more bytes (%zu total)",
484 read_rc, remote->buffer_offset);
485
486 } else if (read_rc == 0) {
487 crm_debug("End of remote data encountered after %zu bytes",
488 remote->buffer_offset);
489 return ENOTCONN;
490
491 } else if ((rc == EINTR) || (rc == EAGAIN) || (rc == EWOULDBLOCK)) {
492 crm_trace("No data available for non-blocking remote read: %s (%d)",
493 pcmk_rc_str(rc), rc);
494
495 } else {
496 crm_debug("Error receiving remote data after %zu bytes: %s (%d)",
497 remote->buffer_offset, pcmk_rc_str(rc), rc);
498 return ENOTCONN;
499 }
500
501 header = localized_remote_header(remote);
502 if(header) {
503 if(remote->buffer_offset < header->size_total) {
504 crm_trace("Read partial remote message (%zu of %" PRIu32 " bytes)",
505 remote->buffer_offset, header->size_total);
506 } else {
507 crm_trace("Read full remote message of %zu bytes",
508 remote->buffer_offset);
509 return pcmk_rc_ok;
510 }
511 }
512
513 return EAGAIN;
514}
515
526int
528{
529 int rc = pcmk_rc_ok;
530 time_t start = time(NULL);
531 int remaining_timeout = 0;
532
533 if (timeout_ms == 0) {
534 timeout_ms = 10000;
535 } else if (timeout_ms < 0) {
536 timeout_ms = 60000;
537 }
538
539 remaining_timeout = timeout_ms;
540 while (remaining_timeout > 0) {
541
542 crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
543 remaining_timeout, timeout_ms);
544 rc = pcmk__remote_ready(remote, remaining_timeout);
545
546 if (rc == ETIME) {
547 crm_err("Timed out (%d ms) while waiting for remote data",
548 remaining_timeout);
549 return rc;
550
551 } else if (rc != pcmk_rc_ok) {
552 crm_debug("Wait for remote data aborted (will retry): %s "
553 QB_XS " rc=%d", pcmk_rc_str(rc), rc);
554
555 } else {
557 if (rc == pcmk_rc_ok) {
558 return rc;
559 } else if (rc == EAGAIN) {
560 crm_trace("Waiting for more remote data");
561 } else {
562 crm_debug("Could not receive remote data: %s " QB_XS " rc=%d",
563 pcmk_rc_str(rc), rc);
564 }
565 }
566
567 // Don't waste time retrying after fatal errors
568 if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
569 return rc;
570 }
571
572 remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
573 }
574 return ETIME;
575}
576
577struct tcp_async_cb_data {
578 int sock;
579 int timeout_ms;
580 time_t start;
581 void *userdata;
582 void (*callback) (void *userdata, int rc, int sock);
584
585// \return TRUE if timer should be rescheduled, FALSE otherwise
586static gboolean
587check_connect_finished(gpointer userdata)
588{
589 struct tcp_async_cb_data *cb_data = userdata;
590 int rc;
591
592 fd_set rset, wset;
593 struct timeval ts = { 0, };
594
595 if (cb_data->start == 0) {
596 // Last connect() returned success immediately
597 rc = pcmk_rc_ok;
598 goto dispatch_done;
599 }
600
601 // If the socket is ready for reading or writing, the connect succeeded
602 FD_ZERO(&rset);
603 FD_SET(cb_data->sock, &rset);
604 wset = rset;
605 rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
606
607 if (rc < 0) { // select() error
608 rc = errno;
609 if ((rc == EINTR) || (rc == EAGAIN)) {
610 if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
611 return TRUE; // There is time left, so reschedule timer
612 } else {
613 rc = ETIMEDOUT;
614 }
615 }
616 crm_trace("Could not check socket %d for connection success: %s (%d)",
617 cb_data->sock, pcmk_rc_str(rc), rc);
618
619 } else if (rc == 0) { // select() timeout
620 if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
621 return TRUE; // There is time left, so reschedule timer
622 }
623 crm_debug("Timed out while waiting for socket %d connection success",
624 cb_data->sock);
625 rc = ETIMEDOUT;
626
627 // select() returned number of file descriptors that are ready
628
629 } else if (FD_ISSET(cb_data->sock, &rset)
630 || FD_ISSET(cb_data->sock, &wset)) {
631
632 // The socket is ready; check it for connection errors
633 int error = 0;
634 socklen_t len = sizeof(error);
635
636 if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
637 rc = errno;
638 crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
639 cb_data->sock, pcmk_rc_str(rc), rc);
640 } else if (error != 0) {
641 rc = error;
642 crm_trace("Socket %d connected with error: %s (%d)",
643 cb_data->sock, pcmk_rc_str(rc), rc);
644 } else {
645 rc = pcmk_rc_ok;
646 }
647
648 } else { // Should not be possible
649 crm_trace("select() succeeded, but socket %d not in resulting "
650 "read/write sets", cb_data->sock);
651 rc = EAGAIN;
652 }
653
654 dispatch_done:
655 if (rc == pcmk_rc_ok) {
656 crm_trace("Socket %d is connected", cb_data->sock);
657 } else {
658 close(cb_data->sock);
659 cb_data->sock = -1;
660 }
661
662 if (cb_data->callback) {
663 cb_data->callback(cb_data->userdata, rc, cb_data->sock);
664 }
665 free(cb_data);
666 return FALSE; // Do not reschedule timer
667}
668
687static int
688connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
689 int timeout_ms, int *timer_id, void *userdata,
690 void (*callback) (void *userdata, int rc, int sock))
691{
692 int rc = 0;
693 int interval = 500;
694 int timer;
695 struct tcp_async_cb_data *cb_data = NULL;
696
697 rc = pcmk__set_nonblocking(sock);
698 if (rc != pcmk_rc_ok) {
699 crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
700 pcmk_rc_str(rc), rc);
701 return rc;
702 }
703
704 rc = connect(sock, addr, addrlen);
705 if (rc < 0) {
706 rc = errno;
707 switch (rc) {
708 case EINTR:
709 case EINPROGRESS:
710 case EAGAIN:
711 break;
712
713 default:
714 crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
715 pcmk_rc_str(rc), rc);
716 return rc;
717 }
718 }
719
720 cb_data = pcmk__assert_alloc(1, sizeof(struct tcp_async_cb_data));
721 cb_data->userdata = userdata;
722 cb_data->callback = callback;
723 cb_data->sock = sock;
724 cb_data->timeout_ms = timeout_ms;
725
726 if (rc == 0) {
727 /* The connect was successful immediately, we still return to mainloop
728 * and let this callback get called later. This avoids the user of this api
729 * to have to account for the fact the callback could be invoked within this
730 * function before returning. */
731 cb_data->start = 0;
732 interval = 1;
733 } else {
734 cb_data->start = time(NULL);
735 }
736
737 /* This timer function does a non-blocking poll on the socket to see if we
738 * can use it. Once we can, the connect has completed. This method allows us
739 * to connect without blocking the mainloop.
740 *
741 * @TODO Use a mainloop fd callback for this instead of polling. Something
742 * about the way mainloop is currently polling prevents this from
743 * working at the moment though. (See connect(2) regarding EINPROGRESS
744 * for possible new handling needed.)
745 */
746 crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
747 interval, sock);
748 timer = pcmk__create_timer(interval, check_connect_finished, cb_data);
749 if (timer_id) {
750 *timer_id = timer;
751 }
752
753 return pcmk_rc_ok;
754}
755
766static int
767connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
768{
769 int rc = connect(sock, addr, addrlen);
770
771 if (rc < 0) {
772 rc = errno;
773 crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
774 pcmk_rc_str(rc), rc);
775 return rc;
776 }
777
778 rc = pcmk__set_nonblocking(sock);
779 if (rc != pcmk_rc_ok) {
780 crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
781 pcmk_rc_str(rc), rc);
782 return rc;
783 }
784
785 return pcmk_ok;
786}
787
804int
805pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
806 int *sock_fd, void *userdata,
807 void (*callback) (void *userdata, int rc, int sock))
808{
809 char buffer[INET6_ADDRSTRLEN];
810 struct addrinfo *res = NULL;
811 struct addrinfo *rp = NULL;
812 struct addrinfo hints;
813 const char *server = host;
814 int rc;
815 int sock = -1;
816
817 CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
818
819 // Get host's IP address(es)
820 memset(&hints, 0, sizeof(struct addrinfo));
821 hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
822 hints.ai_socktype = SOCK_STREAM;
823 hints.ai_flags = AI_CANONNAME;
824
825 rc = getaddrinfo(server, NULL, &hints, &res);
826 rc = pcmk__gaierror2rc(rc);
827
828 if (rc != pcmk_rc_ok) {
829 crm_err("Unable to get IP address info for %s: %s",
830 server, pcmk_rc_str(rc));
831 goto async_cleanup;
832 }
833
834 if (!res || !res->ai_addr) {
835 crm_err("Unable to get IP address info for %s: no result", server);
836 rc = ENOTCONN;
837 goto async_cleanup;
838 }
839
840 // getaddrinfo() returns a list of host's addresses, try them in order
841 for (rp = res; rp != NULL; rp = rp->ai_next) {
842 struct sockaddr *addr = rp->ai_addr;
843
844 if (!addr) {
845 continue;
846 }
847
848 if (rp->ai_canonname) {
849 server = res->ai_canonname;
850 }
851 crm_debug("Got canonical name %s for %s", server, host);
852
853 sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
854 if (sock == -1) {
855 rc = errno;
856 crm_warn("Could not create socket for remote connection to %s:%d: "
857 "%s " QB_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
858 continue;
859 }
860
861 /* Set port appropriately for address family */
862 /* (void*) casts avoid false-positive compiler alignment warnings */
863 if (addr->sa_family == AF_INET6) {
864 ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
865 } else {
866 ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
867 }
868
869 memset(buffer, 0, PCMK__NELEM(buffer));
870 pcmk__sockaddr2str(addr, buffer);
871 crm_info("Attempting remote connection to %s:%d", buffer, port);
872
873 if (callback) {
874 if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
875 timer_id, userdata, callback) == pcmk_rc_ok) {
876 goto async_cleanup; /* Success for now, we'll hear back later in the callback */
877 }
878
879 } else if (connect_socket_once(sock, rp->ai_addr,
880 rp->ai_addrlen) == pcmk_rc_ok) {
881 break; /* Success */
882 }
883
884 // Connect failed
885 close(sock);
886 sock = -1;
887 rc = ENOTCONN;
888 }
889
890async_cleanup:
891
892 if (res) {
893 freeaddrinfo(res);
894 }
895 *sock_fd = sock;
896 return rc;
897}
898
910void
911pcmk__sockaddr2str(const void *sa, char *s)
912{
913 switch (((const struct sockaddr *) sa)->sa_family) {
914 case AF_INET:
915 inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
916 s, INET6_ADDRSTRLEN);
917 break;
918
919 case AF_INET6:
920 inet_ntop(AF_INET6,
921 &(((const struct sockaddr_in6 *) sa)->sin6_addr),
922 s, INET6_ADDRSTRLEN);
923 break;
924
925 default:
926 strcpy(s, "<invalid>");
927 }
928}
929
939int
940pcmk__accept_remote_connection(int ssock, int *csock)
941{
942 int rc;
943 struct sockaddr_storage addr;
944 socklen_t laddr = sizeof(addr);
945 char addr_str[INET6_ADDRSTRLEN];
946#ifdef TCP_USER_TIMEOUT
947 long sbd_timeout = 0;
948#endif
949
950 /* accept the connection */
951 memset(&addr, 0, sizeof(addr));
952 *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
953 if (*csock == -1) {
954 rc = errno;
955 crm_err("Could not accept remote client connection: %s "
956 QB_XS " rc=%d", pcmk_rc_str(rc), rc);
957 return rc;
958 }
959 pcmk__sockaddr2str(&addr, addr_str);
960 crm_info("Accepted new remote client connection from %s", addr_str);
961
962 rc = pcmk__set_nonblocking(*csock);
963 if (rc != pcmk_rc_ok) {
964 crm_err("Could not set socket non-blocking: %s " QB_XS " rc=%d",
965 pcmk_rc_str(rc), rc);
966 close(*csock);
967 *csock = -1;
968 return rc;
969 }
970
971#ifdef TCP_USER_TIMEOUT
972 sbd_timeout = pcmk__get_sbd_watchdog_timeout();
973 if (sbd_timeout > 0) {
974 // Time to fail and retry before watchdog
975 long half = sbd_timeout / 2;
976 unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
977
978 rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
979 &optval, sizeof(optval));
980 if (rc < 0) {
981 rc = errno;
982 crm_err("Could not set TCP timeout to %d ms on remote connection: "
983 "%s " QB_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
984 close(*csock);
985 *csock = -1;
986 return rc;
987 }
988 }
989#endif
990
991 return rc;
992}
993
999int
1001{
1002 static int port = 0;
1003
1004 if (port == 0) {
1005 const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
1006
1007 if (env) {
1008 errno = 0;
1009 port = strtol(env, NULL, 10);
1010 if (errno || (port < 1) || (port > 65535)) {
1011 crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
1012 " has invalid value '%s', using %d instead",
1013 env, DEFAULT_REMOTE_PORT);
1014 port = DEFAULT_REMOTE_PORT;
1015 }
1016 } else {
1017 port = DEFAULT_REMOTE_PORT;
1018 }
1019 }
1020 return port;
1021}
guint pcmk__timeout_ms2s(guint timeout_ms)
Definition utils.c:429
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
Definition utils.c:405
#define PCMK__NELEM(a)
Definition internal.h:50
#define pcmk__assert_alloc(nmemb, size)
Definition internal.h:246
struct tcp_async_cb_data __attribute__
#define ENDIAN_LOCAL
Definition remote.c:69
#define __swab32(x)
Definition remote.c:51
uint32_t endian
Definition remote.c:0
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
Definition remote.c:382
#define REMOTE_MSG_VERSION
Definition remote.c:68
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
Definition remote.c:1000
uint64_t id
Definition remote.c:2
int pcmk__read_available_remote_data(pcmk__remote_t *remote)
Definition remote.c:434
int pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
Definition remote.c:239
int pcmk__accept_remote_connection(int ssock, int *csock)
Definition remote.c:940
#define __swab64(x)
Definition remote.c:57
void pcmk__sockaddr2str(const void *sa, char *s)
Definition remote.c:911
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
Definition remote.c:292
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
Definition remote.c:805
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
Definition remote.c:527
pcmk__cpg_host_t host
Definition cpg.c:4
A dumping ground.
int pcmk__set_nonblocking(int fd)
Definition io.c:525
#define crm_info(fmt, args...)
Definition logging.h:365
#define crm_warn(fmt, args...)
Definition logging.h:360
#define CRM_LOG_ASSERT(expr)
Definition logging.h:196
#define CRM_CHECK(expr, failure_action)
Definition logging.h:213
#define crm_debug(fmt, args...)
Definition logging.h:368
#define crm_err(fmt, args...)
Definition logging.h:357
#define crm_log_xml_trace(xml, text)
Definition logging.h:378
#define crm_trace(fmt, args...)
Definition logging.h:370
#define DEFAULT_REMOTE_PORT
Definition lrmd.h:56
Wrappers for and extensions to glib mainloop.
#define PCMK__ENV_REMOTE_PORT
long pcmk__get_sbd_watchdog_timeout(void)
Definition watchdog.c:197
const char * pcmk__env_option(const char *option)
Definition options.c:1085
unsigned int timeout
Definition pcmk_fence.c:34
#define ETIME
Definition portability.h:66
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition results.c:617
@ pcmk_rc_ok
Definition results.h:159
#define pcmk_ok
Definition results.h:65
#define pcmk__assert(expr)
int pcmk__gaierror2rc(int gai)
Map a getaddrinfo() return code to the most similar Pacemaker return code.
Definition results.c:987
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition results.c:1028
gnutls_session_t tls_session
int pcmk__tls_get_client_sock(const pcmk__remote_t *remote)
Definition tls.c:418
Wrappers for and extensions to libxml2.
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition xml_io.c:370
xmlNode * pcmk__xml_parse(const char *input)
Definition xml_io.c:167