This source file includes following definitions.
- localized_remote_header
- pcmk__tls_client_handshake
- set_minimum_dh_bits
- get_bound_dh_bits
- pcmk__new_tls_session
- pcmk__init_tls_dh
- pcmk__read_handshake_data
- send_tls
- send_plaintext
- remote_send_iovs
- pcmk__remote_send_xml
- pcmk__remote_message_xml
- get_remote_socket
- pcmk__remote_ready
- read_available_remote_data
- pcmk__read_remote_message
- check_connect_finished
- connect_socket_retry
- connect_socket_once
- pcmk__connect_remote
- pcmk__sockaddr2str
- pcmk__accept_remote_connection
- crm_default_remote_port
1
2
3
4
5
6
7
8
9
10 #include <crm_internal.h>
11 #include <crm/crm.h>
12
13 #include <sys/param.h>
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #include <sys/socket.h>
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/ip.h>
22 #include <netinet/tcp.h>
23 #include <netdb.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <inttypes.h>
27
28 #include <glib.h>
29 #include <bzlib.h>
30
31 #include <crm/common/ipc_internal.h>
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
34 #include <crm/common/remote_internal.h>
35
36 #ifdef HAVE_GNUTLS_GNUTLS_H
37 # include <gnutls/gnutls.h>
38 #endif
39
40
41 #ifdef HAVE_LINUX_SWAB_H
42 # include <linux/swab.h>
43 #else
44
45
46
47
48 #define __swab16(x) ((uint16_t)( \
49 (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
50 (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51
52 #define __swab32(x) ((uint32_t)( \
53 (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
54 (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
55 (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
56 (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57
58 #define __swab64(x) ((uint64_t)( \
59 (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
60 (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
61 (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
62 (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
63 (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
64 (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
65 (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
66 (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
67 #endif
68
69 #define REMOTE_MSG_VERSION 1
70 #define ENDIAN_LOCAL 0xBADADBBD
71
72 struct remote_header_v0 {
73 uint32_t endian;
74 uint32_t version;
75 uint64_t id;
76 uint64_t flags;
77 uint32_t size_total;
78 uint32_t payload_offset;
79 uint32_t payload_compressed;
80 uint32_t payload_uncompressed;
81
82
83
84 } __attribute__ ((packed));
85
86
87
88
89
90
91
92
93
94
95
96
97 static struct remote_header_v0 *
98 localized_remote_header(pcmk__remote_t *remote)
99 {
100 struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
101 if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
102 return NULL;
103
104 } else if(header->endian != ENDIAN_LOCAL) {
105 uint32_t endian = __swab32(header->endian);
106
107 CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
108 if(endian != ENDIAN_LOCAL) {
109 crm_err("Invalid message detected, endian mismatch: %" X32T
110 " is neither %" X32T " nor the swab'd %" X32T,
111 ENDIAN_LOCAL, header->endian, endian);
112 return NULL;
113 }
114
115 header->id = __swab64(header->id);
116 header->flags = __swab64(header->flags);
117 header->endian = __swab32(header->endian);
118
119 header->version = __swab32(header->version);
120 header->size_total = __swab32(header->size_total);
121 header->payload_offset = __swab32(header->payload_offset);
122 header->payload_compressed = __swab32(header->payload_compressed);
123 header->payload_uncompressed = __swab32(header->payload_uncompressed);
124 }
125
126 return header;
127 }
128
129 #ifdef HAVE_GNUTLS_GNUTLS_H
130
131 int
132 pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_ms)
133 {
134 int rc = 0;
135 int pollrc = 0;
136 time_t time_limit = time(NULL) + timeout_ms / 1000;
137
138 do {
139 rc = gnutls_handshake(*remote->tls_session);
140 if ((rc == GNUTLS_E_INTERRUPTED) || (rc == GNUTLS_E_AGAIN)) {
141 pollrc = pcmk__remote_ready(remote, 1000);
142 if ((pollrc != pcmk_rc_ok) && (pollrc != ETIME)) {
143
144 crm_trace("TLS handshake poll failed: %s (%d)",
145 pcmk_strerror(pollrc), pollrc);
146 return pcmk_legacy2rc(pollrc);
147 }
148 } else if (rc < 0) {
149 crm_trace("TLS handshake failed: %s (%d)",
150 gnutls_strerror(rc), rc);
151 return EPROTO;
152 } else {
153 return pcmk_rc_ok;
154 }
155 } while (time(NULL) < time_limit);
156 return ETIME;
157 }
158
159
160
161
162
163
164
165 static void
166 set_minimum_dh_bits(gnutls_session_t *session)
167 {
168 const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
169
170 if (dh_min_bits_s) {
171 int dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
172
173
174
175
176
177 if (dh_min_bits > 0) {
178 crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
179 dh_min_bits);
180 gnutls_dh_set_prime_bits(*session, dh_min_bits);
181 }
182 }
183 }
184
185 static unsigned int
186 get_bound_dh_bits(unsigned int dh_bits)
187 {
188 const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
189 const char *dh_max_bits_s = getenv("PCMK_dh_max_bits");
190 int dh_min_bits = 0;
191 int dh_max_bits = 0;
192
193 if (dh_min_bits_s) {
194 dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
195 }
196 if (dh_max_bits_s) {
197 dh_max_bits = crm_parse_int(dh_max_bits_s, "0");
198 if ((dh_min_bits > 0) && (dh_max_bits > 0)
199 && (dh_max_bits < dh_min_bits)) {
200 crm_warn("Ignoring PCMK_dh_max_bits because it is less than PCMK_dh_min_bits");
201 dh_max_bits = 0;
202 }
203 }
204 if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
205 return dh_min_bits;
206 }
207 if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
208 return dh_max_bits;
209 }
210 return dh_bits;
211 }
212
213
214
215
216
217
218
219
220
221
222
223
224 gnutls_session_t *
225 pcmk__new_tls_session(int csock, unsigned int conn_type,
226 gnutls_credentials_type_t cred_type, void *credentials)
227 {
228 int rc = GNUTLS_E_SUCCESS;
229 const char *prio_base = NULL;
230 char *prio = NULL;
231 gnutls_session_t *session = NULL;
232
233
234
235
236
237
238
239
240 prio_base = getenv("PCMK_tls_priorities");
241 if (prio_base == NULL) {
242 prio_base = PCMK_GNUTLS_PRIORITIES;
243 }
244 prio = crm_strdup_printf("%s:%s", prio_base,
245 (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
246
247 session = gnutls_malloc(sizeof(gnutls_session_t));
248 if (session == NULL) {
249 rc = GNUTLS_E_MEMORY_ERROR;
250 goto error;
251 }
252
253 rc = gnutls_init(session, conn_type);
254 if (rc != GNUTLS_E_SUCCESS) {
255 goto error;
256 }
257
258
259
260
261
262 rc = gnutls_priority_set_direct(*session, prio, NULL);
263 if (rc != GNUTLS_E_SUCCESS) {
264 goto error;
265 }
266 if (conn_type == GNUTLS_CLIENT) {
267 set_minimum_dh_bits(session);
268 }
269
270 gnutls_transport_set_ptr(*session,
271 (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
272
273 rc = gnutls_credentials_set(*session, cred_type, credentials);
274 if (rc != GNUTLS_E_SUCCESS) {
275 goto error;
276 }
277 free(prio);
278 return session;
279
280 error:
281 crm_err("Could not initialize %s TLS %s session: %s "
282 CRM_XS " rc=%d priority='%s'",
283 (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
284 (conn_type == GNUTLS_SERVER)? "server" : "client",
285 gnutls_strerror(rc), rc, prio);
286 free(prio);
287 if (session != NULL) {
288 gnutls_free(session);
289 }
290 return NULL;
291 }
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308 int
309 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
310 {
311 int rc = GNUTLS_E_SUCCESS;
312 unsigned int dh_bits = 0;
313
314 rc = gnutls_dh_params_init(dh_params);
315 if (rc != GNUTLS_E_SUCCESS) {
316 goto error;
317 }
318
319 #ifdef HAVE_GNUTLS_SEC_PARAM_TO_PK_BITS
320 dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
321 GNUTLS_SEC_PARAM_NORMAL);
322 if (dh_bits == 0) {
323 rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
324 goto error;
325 }
326 #else
327 dh_bits = 1024;
328 #endif
329 dh_bits = get_bound_dh_bits(dh_bits);
330
331 crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
332 dh_bits);
333 rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
334 if (rc != GNUTLS_E_SUCCESS) {
335 goto error;
336 }
337
338 return pcmk_rc_ok;
339
340 error:
341 crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
342 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
343 return EPROTO;
344 }
345
346
347
348
349
350
351
352
353
354
355
356
357 int
358 pcmk__read_handshake_data(pcmk__client_t *client)
359 {
360 int rc = 0;
361
362 CRM_ASSERT(client && client->remote && client->remote->tls_session);
363
364 do {
365 rc = gnutls_handshake(*client->remote->tls_session);
366 } while (rc == GNUTLS_E_INTERRUPTED);
367
368 if (rc == GNUTLS_E_AGAIN) {
369
370
371
372 return EAGAIN;
373 } else if (rc != GNUTLS_E_SUCCESS) {
374 crm_err("TLS handshake with remote client failed: %s "
375 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
376 return EPROTO;
377 }
378 return pcmk_rc_ok;
379 }
380
381
382 static int
383 send_tls(gnutls_session_t *session, struct iovec *iov)
384 {
385 const char *unsent = iov->iov_base;
386 size_t unsent_len = iov->iov_len;
387 ssize_t gnutls_rc;
388
389 if (unsent == NULL) {
390 return EINVAL;
391 }
392
393 crm_trace("Sending TLS message of %llu bytes",
394 (unsigned long long) unsent_len);
395 while (true) {
396 gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
397
398 if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
399 crm_trace("Retrying to send %llu bytes remaining",
400 (unsigned long long) unsent_len);
401
402 } else if (gnutls_rc < 0) {
403
404 crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
405 gnutls_strerror((int) gnutls_rc),
406 (long long) gnutls_rc);
407 return ECONNABORTED;
408
409 } else if (gnutls_rc < unsent_len) {
410 crm_trace("Sent %lld of %llu bytes remaining",
411 (long long) gnutls_rc, (unsigned long long) unsent_len);
412 unsent_len -= gnutls_rc;
413 unsent += gnutls_rc;
414 } else {
415 crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
416 break;
417 }
418 }
419 return pcmk_rc_ok;
420 }
421 #endif
422
423
424 static int
425 send_plaintext(int sock, struct iovec *iov)
426 {
427 const char *unsent = iov->iov_base;
428 size_t unsent_len = iov->iov_len;
429 ssize_t write_rc;
430
431 if (unsent == NULL) {
432 return EINVAL;
433 }
434
435 crm_debug("Sending plaintext message of %llu bytes to socket %d",
436 (unsigned long long) unsent_len, sock);
437 while (true) {
438 write_rc = write(sock, unsent, unsent_len);
439 if (write_rc < 0) {
440 int rc = errno;
441
442 if ((errno == EINTR) || (errno == EAGAIN)) {
443 crm_trace("Retrying to send %llu bytes remaining to socket %d",
444 (unsigned long long) unsent_len, sock);
445 continue;
446 }
447
448
449 crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
450 pcmk_rc_str(rc), rc, sock);
451 return rc;
452
453 } else if (write_rc < unsent_len) {
454 crm_trace("Sent %lld of %llu bytes remaining",
455 (long long) write_rc, (unsigned long long) unsent_len);
456 unsent += write_rc;
457 unsent_len -= write_rc;
458 continue;
459
460 } else {
461 crm_trace("Sent all %lld bytes remaining: %.100s",
462 (long long) write_rc, (char *) (iov->iov_base));
463 break;
464 }
465 }
466 return pcmk_rc_ok;
467 }
468
469
470 static int
471 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
472 {
473 int rc = pcmk_rc_ok;
474
475 for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
476 #ifdef HAVE_GNUTLS_GNUTLS_H
477 if (remote->tls_session) {
478 rc = send_tls(remote->tls_session, &(iov[lpc]));
479 continue;
480 }
481 #endif
482 if (remote->tcp_socket) {
483 rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
484 } else {
485 rc = ESOCKTNOSUPPORT;
486 }
487 }
488 return rc;
489 }
490
491
492
493
494
495
496
497
498
499
500 int
501 pcmk__remote_send_xml(pcmk__remote_t *remote, xmlNode *msg)
502 {
503 int rc = pcmk_rc_ok;
504 static uint64_t id = 0;
505 char *xml_text = NULL;
506
507 struct iovec iov[2];
508 struct remote_header_v0 *header;
509
510 CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
511
512 xml_text = dump_xml_unformatted(msg);
513 CRM_CHECK(xml_text != NULL, return EINVAL);
514
515 header = calloc(1, sizeof(struct remote_header_v0));
516 CRM_ASSERT(header != NULL);
517
518 iov[0].iov_base = header;
519 iov[0].iov_len = sizeof(struct remote_header_v0);
520
521 iov[1].iov_base = xml_text;
522 iov[1].iov_len = 1 + strlen(xml_text);
523
524 id++;
525 header->id = id;
526 header->endian = ENDIAN_LOCAL;
527 header->version = REMOTE_MSG_VERSION;
528 header->payload_offset = iov[0].iov_len;
529 header->payload_uncompressed = iov[1].iov_len;
530 header->size_total = iov[0].iov_len + iov[1].iov_len;
531
532 rc = remote_send_iovs(remote, iov, 2);
533 if (rc != pcmk_rc_ok) {
534 crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
535 pcmk_rc_str(rc), rc);
536 }
537
538 free(iov[0].iov_base);
539 free(iov[1].iov_base);
540 return rc;
541 }
542
543
544
545
546
547
548
549
550
551
552 xmlNode *
553 pcmk__remote_message_xml(pcmk__remote_t *remote)
554 {
555 xmlNode *xml = NULL;
556 struct remote_header_v0 *header = localized_remote_header(remote);
557
558 if (header == NULL) {
559 return NULL;
560 }
561
562
563 if (header->payload_compressed) {
564 int rc = 0;
565 unsigned int size_u = 1 + header->payload_uncompressed;
566 char *uncompressed = calloc(1, header->payload_offset + size_u);
567
568 crm_trace("Decompressing message data %d bytes into %d bytes",
569 header->payload_compressed, size_u);
570
571 rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
572 remote->buffer + header->payload_offset,
573 header->payload_compressed, 1, 0);
574
575 if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
576 crm_warn("Couldn't decompress v%d message, we only understand v%d",
577 header->version, REMOTE_MSG_VERSION);
578 free(uncompressed);
579 return NULL;
580
581 } else if (rc != BZ_OK) {
582 crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
583 bz2_strerror(rc), rc);
584 free(uncompressed);
585 return NULL;
586 }
587
588 CRM_ASSERT(size_u == header->payload_uncompressed);
589
590 memcpy(uncompressed, remote->buffer, header->payload_offset);
591 remote->buffer_size = header->payload_offset + size_u;
592
593 free(remote->buffer);
594 remote->buffer = uncompressed;
595 header = localized_remote_header(remote);
596 }
597
598
599 remote->buffer_offset = 0;
600
601 CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
602
603 xml = string2xml(remote->buffer + header->payload_offset);
604 if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
605 crm_warn("Couldn't parse v%d message, we only understand v%d",
606 header->version, REMOTE_MSG_VERSION);
607
608 } else if (xml == NULL) {
609 crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
610 }
611
612 return xml;
613 }
614
615 static int
616 get_remote_socket(pcmk__remote_t *remote)
617 {
618 #ifdef HAVE_GNUTLS_GNUTLS_H
619 if (remote->tls_session) {
620 void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
621
622 return GPOINTER_TO_INT(sock_ptr);
623 }
624 #endif
625
626 if (remote->tcp_socket) {
627 return remote->tcp_socket;
628 }
629
630 crm_err("Remote connection type undetermined (bug?)");
631 return -1;
632 }
633
634
635
636
637
638
639
640
641
642
643
644
645 int
646 pcmk__remote_ready(pcmk__remote_t *remote, int timeout_ms)
647 {
648 struct pollfd fds = { 0, };
649 int sock = 0;
650 int rc = 0;
651 time_t start;
652 int timeout = timeout_ms;
653
654 sock = get_remote_socket(remote);
655 if (sock <= 0) {
656 crm_trace("No longer connected");
657 return ENOTCONN;
658 }
659
660 start = time(NULL);
661 errno = 0;
662 do {
663 fds.fd = sock;
664 fds.events = POLLIN;
665
666
667
668
669 if (errno == EINTR && (timeout > 0)) {
670 timeout = timeout_ms - ((time(NULL) - start) * 1000);
671 if (timeout < 1000) {
672 timeout = 1000;
673 }
674 }
675
676 rc = poll(&fds, 1, timeout);
677 } while (rc < 0 && errno == EINTR);
678
679 if (rc < 0) {
680 return errno;
681 }
682 return (rc == 0)? ETIME : pcmk_rc_ok;
683 }
684
685
686
687
688
689
690
691
692
693
694
695
696
697 static int
698 read_available_remote_data(pcmk__remote_t *remote)
699 {
700 int rc = pcmk_rc_ok;
701 size_t read_len = sizeof(struct remote_header_v0);
702 struct remote_header_v0 *header = localized_remote_header(remote);
703 bool received = false;
704 ssize_t read_rc;
705
706 if(header) {
707
708 read_len = header->size_total;
709 }
710
711
712 if(remote->buffer_size < read_len) {
713 remote->buffer_size = 2 * read_len;
714 crm_trace("Expanding buffer to %llu bytes",
715 (unsigned long long) remote->buffer_size);
716 remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
717 }
718
719 #ifdef HAVE_GNUTLS_GNUTLS_H
720 if (!received && remote->tls_session) {
721 read_rc = gnutls_record_recv(*(remote->tls_session),
722 remote->buffer + remote->buffer_offset,
723 remote->buffer_size - remote->buffer_offset);
724 if (read_rc == GNUTLS_E_INTERRUPTED) {
725 rc = EINTR;
726 } else if (read_rc == GNUTLS_E_AGAIN) {
727 rc = EAGAIN;
728 } else if (read_rc < 0) {
729 crm_debug("TLS receive failed: %s (%lld)",
730 gnutls_strerror(read_rc), (long long) read_rc);
731 rc = EIO;
732 }
733 received = true;
734 }
735 #endif
736
737 if (!received && remote->tcp_socket) {
738 read_rc = read(remote->tcp_socket,
739 remote->buffer + remote->buffer_offset,
740 remote->buffer_size - remote->buffer_offset);
741 if (read_rc < 0) {
742 rc = errno;
743 }
744 received = true;
745 }
746
747 if (!received) {
748 crm_err("Remote connection type undetermined (bug?)");
749 return ESOCKTNOSUPPORT;
750 }
751
752
753 if (read_rc > 0) {
754 remote->buffer_offset += read_rc;
755
756 remote->buffer[remote->buffer_offset] = '\0';
757 crm_trace("Received %lld more bytes (%llu total)",
758 (long long) read_rc,
759 (unsigned long long) remote->buffer_offset);
760
761 } else if ((rc == EINTR) || (rc == EAGAIN)) {
762 crm_trace("No data available for non-blocking remote read: %s (%d)",
763 pcmk_rc_str(rc), rc);
764
765 } else if (read_rc == 0) {
766 crm_debug("End of remote data encountered after %llu bytes",
767 (unsigned long long) remote->buffer_offset);
768 return ENOTCONN;
769
770 } else {
771 crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
772 (unsigned long long) remote->buffer_offset,
773 pcmk_rc_str(rc), rc);
774 return ENOTCONN;
775 }
776
777 header = localized_remote_header(remote);
778 if(header) {
779 if(remote->buffer_offset < header->size_total) {
780 crm_trace("Read partial remote message (%llu of %u bytes)",
781 (unsigned long long) remote->buffer_offset,
782 header->size_total);
783 } else {
784 crm_trace("Read full remote message of %llu bytes",
785 (unsigned long long) remote->buffer_offset);
786 return pcmk_rc_ok;
787 }
788 }
789
790 return EAGAIN;
791 }
792
793
794
795
796
797
798
799
800
801
802
803 int
804 pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
805 {
806 int rc = pcmk_rc_ok;
807 time_t start = time(NULL);
808 int remaining_timeout = 0;
809
810 if (timeout_ms == 0) {
811 timeout_ms = 10000;
812 } else if (timeout_ms < 0) {
813 timeout_ms = 60000;
814 }
815
816 remaining_timeout = timeout_ms;
817 while (remaining_timeout > 0) {
818
819 crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
820 remaining_timeout, timeout_ms);
821 rc = pcmk__remote_ready(remote, remaining_timeout);
822
823 if (rc == ETIME) {
824 crm_err("Timed out (%d ms) while waiting for remote data",
825 remaining_timeout);
826 return rc;
827
828 } else if (rc != pcmk_rc_ok) {
829 crm_debug("Wait for remote data aborted (will retry): %s "
830 CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
831
832 } else {
833 rc = read_available_remote_data(remote);
834 if (rc == pcmk_rc_ok) {
835 return rc;
836 } else if (rc == EAGAIN) {
837 crm_trace("Waiting for more remote data");
838 } else {
839 crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
840 pcmk_rc_str(rc), rc);
841 }
842 }
843
844
845 if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
846 return rc;
847 }
848
849 remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
850 }
851 return ETIME;
852 }
853
854 struct tcp_async_cb_data {
855 int sock;
856 int timeout_ms;
857 time_t start;
858 void *userdata;
859 void (*callback) (void *userdata, int rc, int sock);
860 };
861
862
863 static gboolean
864 check_connect_finished(gpointer userdata)
865 {
866 struct tcp_async_cb_data *cb_data = userdata;
867 int rc;
868
869 fd_set rset, wset;
870 struct timeval ts = { 0, };
871
872 if (cb_data->start == 0) {
873
874 rc = pcmk_rc_ok;
875 goto dispatch_done;
876 }
877
878
879 FD_ZERO(&rset);
880 FD_SET(cb_data->sock, &rset);
881 wset = rset;
882 rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
883
884 if (rc < 0) {
885 rc = errno;
886 if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
887 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
888 return TRUE;
889 } else {
890 rc = ETIMEDOUT;
891 }
892 }
893 crm_trace("Could not check socket %d for connection success: %s (%d)",
894 cb_data->sock, pcmk_rc_str(rc), rc);
895
896 } else if (rc == 0) {
897 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
898 return TRUE;
899 }
900 crm_debug("Timed out while waiting for socket %d connection success",
901 cb_data->sock);
902 rc = ETIMEDOUT;
903
904
905
906 } else if (FD_ISSET(cb_data->sock, &rset)
907 || FD_ISSET(cb_data->sock, &wset)) {
908
909
910 int error = 0;
911 socklen_t len = sizeof(error);
912
913 if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
914 rc = errno;
915 crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
916 cb_data->sock, pcmk_rc_str(rc), rc);
917 } else if (error != 0) {
918 rc = error;
919 crm_trace("Socket %d connected with error: %s (%d)",
920 cb_data->sock, pcmk_rc_str(rc), rc);
921 } else {
922 rc = pcmk_rc_ok;
923 }
924
925 } else {
926 crm_trace("select() succeeded, but socket %d not in resulting "
927 "read/write sets", cb_data->sock);
928 rc = EAGAIN;
929 }
930
931 dispatch_done:
932 if (rc == pcmk_rc_ok) {
933 crm_trace("Socket %d is connected", cb_data->sock);
934 } else {
935 close(cb_data->sock);
936 cb_data->sock = -1;
937 }
938
939 if (cb_data->callback) {
940 cb_data->callback(cb_data->userdata, rc, cb_data->sock);
941 }
942 free(cb_data);
943 return FALSE;
944 }
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964 static int
965 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
966 int timeout_ms, int *timer_id, void *userdata,
967 void (*callback) (void *userdata, int rc, int sock))
968 {
969 int rc = 0;
970 int interval = 500;
971 int timer;
972 struct tcp_async_cb_data *cb_data = NULL;
973
974 rc = pcmk__set_nonblocking(sock);
975 if (rc != pcmk_rc_ok) {
976 crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
977 pcmk_rc_str(rc), rc);
978 return rc;
979 }
980
981 rc = connect(sock, addr, addrlen);
982 if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
983 rc = errno;
984 crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
985 pcmk_rc_str(rc), rc);
986 return rc;
987 }
988
989 cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
990 cb_data->userdata = userdata;
991 cb_data->callback = callback;
992 cb_data->sock = sock;
993 cb_data->timeout_ms = timeout_ms;
994
995 if (rc == 0) {
996
997
998
999
1000 cb_data->start = 0;
1001 interval = 1;
1002 } else {
1003 cb_data->start = time(NULL);
1004 }
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015 crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1016 interval, sock);
1017 timer = g_timeout_add(interval, check_connect_finished, cb_data);
1018 if (timer_id) {
1019 *timer_id = timer;
1020 }
1021
1022
1023
1024 return pcmk_rc_ok;
1025 }
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037 static int
1038 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
1039 {
1040 int rc = connect(sock, addr, addrlen);
1041
1042 if (rc < 0) {
1043 rc = errno;
1044 crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1045 pcmk_rc_str(rc), rc);
1046 return rc;
1047 }
1048
1049 rc = pcmk__set_nonblocking(sock);
1050 if (rc != pcmk_rc_ok) {
1051 crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1052 pcmk_rc_str(rc), rc);
1053 return rc;
1054 }
1055
1056 return pcmk_ok;
1057 }
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075 int
1076 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
1077 int *sock_fd, void *userdata,
1078 void (*callback) (void *userdata, int rc, int sock))
1079 {
1080 char buffer[INET6_ADDRSTRLEN];
1081 struct addrinfo *res = NULL;
1082 struct addrinfo *rp = NULL;
1083 struct addrinfo hints;
1084 const char *server = host;
1085 int rc;
1086 int sock = -1;
1087
1088 CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
1089
1090
1091 memset(&hints, 0, sizeof(struct addrinfo));
1092 hints.ai_family = AF_UNSPEC;
1093 hints.ai_socktype = SOCK_STREAM;
1094 hints.ai_flags = AI_CANONNAME;
1095 rc = getaddrinfo(server, NULL, &hints, &res);
1096 if (rc != 0) {
1097 crm_err("Unable to get IP address info for %s: %s",
1098 server, gai_strerror(rc));
1099 rc = ENOTCONN;
1100 goto async_cleanup;
1101 }
1102 if (!res || !res->ai_addr) {
1103 crm_err("Unable to get IP address info for %s: no result", server);
1104 rc = ENOTCONN;
1105 goto async_cleanup;
1106 }
1107
1108
1109 for (rp = res; rp != NULL; rp = rp->ai_next) {
1110 struct sockaddr *addr = rp->ai_addr;
1111
1112 if (!addr) {
1113 continue;
1114 }
1115
1116 if (rp->ai_canonname) {
1117 server = res->ai_canonname;
1118 }
1119 crm_debug("Got canonical name %s for %s", server, host);
1120
1121 sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1122 if (sock == -1) {
1123 rc = errno;
1124 crm_warn("Could not create socket for remote connection to %s:%d: "
1125 "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
1126 continue;
1127 }
1128
1129
1130
1131 if (addr->sa_family == AF_INET6) {
1132 ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1133 } else {
1134 ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1135 }
1136
1137 memset(buffer, 0, DIMOF(buffer));
1138 pcmk__sockaddr2str(addr, buffer);
1139 crm_info("Attempting remote connection to %s:%d", buffer, port);
1140
1141 if (callback) {
1142 if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
1143 timer_id, userdata, callback) == pcmk_rc_ok) {
1144 goto async_cleanup;
1145 }
1146
1147 } else if (connect_socket_once(sock, rp->ai_addr,
1148 rp->ai_addrlen) == pcmk_rc_ok) {
1149 break;
1150 }
1151
1152
1153 close(sock);
1154 sock = -1;
1155 rc = ENOTCONN;
1156 }
1157
1158 async_cleanup:
1159
1160 if (res) {
1161 freeaddrinfo(res);
1162 }
1163 *sock_fd = sock;
1164 return rc;
1165 }
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178 void
1179 pcmk__sockaddr2str(void *sa, char *s)
1180 {
1181 switch (((struct sockaddr*)sa)->sa_family) {
1182 case AF_INET:
1183 inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
1184 s, INET6_ADDRSTRLEN);
1185 break;
1186
1187 case AF_INET6:
1188 inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
1189 s, INET6_ADDRSTRLEN);
1190 break;
1191
1192 default:
1193 strcpy(s, "<invalid>");
1194 }
1195 }
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206 int
1207 pcmk__accept_remote_connection(int ssock, int *csock)
1208 {
1209 int rc;
1210 struct sockaddr_storage addr;
1211 socklen_t laddr = sizeof(addr);
1212 char addr_str[INET6_ADDRSTRLEN];
1213
1214
1215 memset(&addr, 0, sizeof(addr));
1216 *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1217 if (*csock == -1) {
1218 rc = errno;
1219 crm_err("Could not accept remote client connection: %s "
1220 CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
1221 return rc;
1222 }
1223 pcmk__sockaddr2str(&addr, addr_str);
1224 crm_info("Accepted new remote client connection from %s", addr_str);
1225
1226 rc = pcmk__set_nonblocking(*csock);
1227 if (rc != pcmk_rc_ok) {
1228 crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1229 pcmk_rc_str(rc), rc);
1230 close(*csock);
1231 *csock = -1;
1232 return rc;
1233 }
1234
1235 #ifdef TCP_USER_TIMEOUT
1236 if (pcmk__get_sbd_timeout() > 0) {
1237
1238 unsigned int optval = (unsigned int) pcmk__get_sbd_timeout() / 2;
1239
1240 rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1241 &optval, sizeof(optval));
1242 if (rc < 0) {
1243 rc = errno;
1244 crm_err("Could not set TCP timeout to %d ms on remote connection: "
1245 "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
1246 close(*csock);
1247 *csock = -1;
1248 return rc;
1249 }
1250 }
1251 #endif
1252
1253 return rc;
1254 }
1255
1256
1257
1258
1259
1260
1261 int
1262 crm_default_remote_port()
1263 {
1264 static int port = 0;
1265
1266 if (port == 0) {
1267 const char *env = getenv("PCMK_remote_port");
1268
1269 if (env) {
1270 errno = 0;
1271 port = strtol(env, NULL, 10);
1272 if (errno || (port < 1) || (port > 65535)) {
1273 crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1274 env, DEFAULT_REMOTE_PORT);
1275 port = DEFAULT_REMOTE_PORT;
1276 }
1277 } else {
1278 port = DEFAULT_REMOTE_PORT;
1279 }
1280 }
1281 return port;
1282 }