This source file includes following definitions.
- localized_remote_header
- pcmk__tls_client_try_handshake
- pcmk__tls_client_handshake
- set_minimum_dh_bits
- get_bound_dh_bits
- pcmk__new_tls_session
- pcmk__init_tls_dh
- pcmk__read_handshake_data
- send_tls
- send_plaintext
- remote_send_iovs
- pcmk__remote_send_xml
- pcmk__remote_message_xml
- get_remote_socket
- pcmk__remote_ready
- pcmk__read_available_remote_data
- pcmk__read_remote_message
- check_connect_finished
- connect_socket_retry
- connect_socket_once
- pcmk__connect_remote
- pcmk__sockaddr2str
- pcmk__accept_remote_connection
- crm_default_remote_port
1
2
3
4
5
6
7
8
9
10 #include <crm_internal.h>
11 #include <crm/crm.h>
12
13 #include <sys/param.h>
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #include <sys/socket.h>
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/ip.h>
22 #include <netinet/tcp.h>
23 #include <netdb.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <inttypes.h>
27
28 #include <glib.h>
29 #include <bzlib.h>
30
31 #include <crm/common/ipc_internal.h>
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
34 #include <crm/common/remote_internal.h>
35
36 #ifdef HAVE_GNUTLS_GNUTLS_H
37 # include <gnutls/gnutls.h>
38 #endif
39
40
41 #ifdef HAVE_LINUX_SWAB_H
42 # include <linux/swab.h>
43 #else
44
45
46
47
48 #define __swab16(x) ((uint16_t)( \
49 (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
50 (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51
52 #define __swab32(x) ((uint32_t)( \
53 (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
54 (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
55 (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
56 (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57
58 #define __swab64(x) ((uint64_t)( \
59 (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
60 (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
61 (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
62 (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
63 (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
64 (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
65 (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
66 (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
67 #endif
68
69 #define REMOTE_MSG_VERSION 1
70 #define ENDIAN_LOCAL 0xBADADBBD
71
72 struct remote_header_v0 {
73 uint32_t endian;
74 uint32_t version;
75 uint64_t id;
76 uint64_t flags;
77 uint32_t size_total;
78 uint32_t payload_offset;
79 uint32_t payload_compressed;
80 uint32_t payload_uncompressed;
81
82
83
84 } __attribute__ ((packed));
85
86
87
88
89
90
91
92
93
94
95
96
97 static struct remote_header_v0 *
98 localized_remote_header(pcmk__remote_t *remote)
99 {
100 struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
101 if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
102 return NULL;
103
104 } else if(header->endian != ENDIAN_LOCAL) {
105 uint32_t endian = __swab32(header->endian);
106
107 CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
108 if(endian != ENDIAN_LOCAL) {
109 crm_err("Invalid message detected, endian mismatch: %" PRIx32
110 " is neither %" PRIx32 " nor the swab'd %" PRIx32,
111 ENDIAN_LOCAL, header->endian, endian);
112 return NULL;
113 }
114
115 header->id = __swab64(header->id);
116 header->flags = __swab64(header->flags);
117 header->endian = __swab32(header->endian);
118
119 header->version = __swab32(header->version);
120 header->size_total = __swab32(header->size_total);
121 header->payload_offset = __swab32(header->payload_offset);
122 header->payload_compressed = __swab32(header->payload_compressed);
123 header->payload_uncompressed = __swab32(header->payload_uncompressed);
124 }
125
126 return header;
127 }
128
129 #ifdef HAVE_GNUTLS_GNUTLS_H
130
131 int
132 pcmk__tls_client_try_handshake(pcmk__remote_t *remote, int *gnutls_rc)
133 {
134 int rc = pcmk_rc_ok;
135
136 if (gnutls_rc != NULL) {
137 *gnutls_rc = GNUTLS_E_SUCCESS;
138 }
139
140 rc = gnutls_handshake(*remote->tls_session);
141
142 switch (rc) {
143 case GNUTLS_E_SUCCESS:
144 rc = pcmk_rc_ok;
145 break;
146
147 case GNUTLS_E_INTERRUPTED:
148 case GNUTLS_E_AGAIN:
149 rc = EAGAIN;
150 break;
151
152 default:
153 if (gnutls_rc != NULL) {
154 *gnutls_rc = rc;
155 }
156
157 rc = EPROTO;
158 break;
159 }
160
161 return rc;
162 }
163
164 int pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_sec,
165 int *gnutls_rc)
166 {
167 const time_t time_limit = time(NULL) + timeout_sec;
168
169 do {
170 int rc = pcmk__tls_client_try_handshake(remote, gnutls_rc);
171
172 if (rc != EAGAIN) {
173 return rc;
174 }
175 } while (time(NULL) < time_limit);
176
177 return ETIME;
178 }
179
180
181
182
183
184
185
186 static void
187 set_minimum_dh_bits(const gnutls_session_t *session)
188 {
189 int dh_min_bits;
190
191 pcmk__scan_min_int(pcmk__env_option(PCMK__ENV_DH_MIN_BITS), &dh_min_bits,
192 0);
193
194
195
196
197
198 if (dh_min_bits > 0) {
199 crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
200 dh_min_bits);
201 crm_warn("Support for the " PCMK__ENV_DH_MIN_BITS " "
202 "environment variable is deprecated and will be removed "
203 "in a future release");
204 gnutls_dh_set_prime_bits(*session, dh_min_bits);
205 }
206 }
207
208 static unsigned int
209 get_bound_dh_bits(unsigned int dh_bits)
210 {
211 int dh_min_bits;
212 int dh_max_bits;
213
214 pcmk__scan_min_int(pcmk__env_option(PCMK__ENV_DH_MIN_BITS), &dh_min_bits,
215 0);
216 pcmk__scan_min_int(pcmk__env_option(PCMK__ENV_DH_MAX_BITS), &dh_max_bits,
217 0);
218
219 if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) {
220 crm_warn("Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits");
221 dh_max_bits = 0;
222 }
223 if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
224 return dh_min_bits;
225 }
226 if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
227 return dh_max_bits;
228 }
229 return dh_bits;
230 }
231
232
233
234
235
236
237
238
239
240
241
242
243 gnutls_session_t *
244 pcmk__new_tls_session(int csock, unsigned int conn_type,
245 gnutls_credentials_type_t cred_type, void *credentials)
246 {
247 int rc = GNUTLS_E_SUCCESS;
248 const char *prio_base = NULL;
249 char *prio = NULL;
250 gnutls_session_t *session = NULL;
251
252
253
254
255
256
257
258
259 prio_base = pcmk__env_option(PCMK__ENV_TLS_PRIORITIES);
260 if (prio_base == NULL) {
261 prio_base = PCMK_GNUTLS_PRIORITIES;
262 }
263 prio = crm_strdup_printf("%s:%s", prio_base,
264 (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
265
266 session = gnutls_malloc(sizeof(gnutls_session_t));
267 if (session == NULL) {
268 rc = GNUTLS_E_MEMORY_ERROR;
269 goto error;
270 }
271
272 rc = gnutls_init(session, conn_type);
273 if (rc != GNUTLS_E_SUCCESS) {
274 goto error;
275 }
276
277
278
279
280
281 rc = gnutls_priority_set_direct(*session, prio, NULL);
282 if (rc != GNUTLS_E_SUCCESS) {
283 goto error;
284 }
285 if (conn_type == GNUTLS_CLIENT) {
286 set_minimum_dh_bits(session);
287 }
288
289 gnutls_transport_set_ptr(*session,
290 (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
291
292 rc = gnutls_credentials_set(*session, cred_type, credentials);
293 if (rc != GNUTLS_E_SUCCESS) {
294 goto error;
295 }
296 free(prio);
297 return session;
298
299 error:
300 crm_err("Could not initialize %s TLS %s session: %s "
301 CRM_XS " rc=%d priority='%s'",
302 (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
303 (conn_type == GNUTLS_SERVER)? "server" : "client",
304 gnutls_strerror(rc), rc, prio);
305 free(prio);
306 if (session != NULL) {
307 gnutls_free(session);
308 }
309 return NULL;
310 }
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327 int
328 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
329 {
330 int rc = GNUTLS_E_SUCCESS;
331 unsigned int dh_bits = 0;
332
333 rc = gnutls_dh_params_init(dh_params);
334 if (rc != GNUTLS_E_SUCCESS) {
335 goto error;
336 }
337
338 dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
339 GNUTLS_SEC_PARAM_NORMAL);
340 if (dh_bits == 0) {
341 rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
342 goto error;
343 }
344 dh_bits = get_bound_dh_bits(dh_bits);
345
346 crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
347 dh_bits);
348 rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
349 if (rc != GNUTLS_E_SUCCESS) {
350 goto error;
351 }
352
353 return pcmk_rc_ok;
354
355 error:
356 crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
357 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
358 return EPROTO;
359 }
360
361
362
363
364
365
366
367
368
369
370
371
372 int
373 pcmk__read_handshake_data(const pcmk__client_t *client)
374 {
375 int rc = 0;
376
377 pcmk__assert((client != NULL) && (client->remote != NULL)
378 && (client->remote->tls_session != NULL));
379
380 do {
381 rc = gnutls_handshake(*client->remote->tls_session);
382 } while (rc == GNUTLS_E_INTERRUPTED);
383
384 if (rc == GNUTLS_E_AGAIN) {
385
386
387
388 return EAGAIN;
389 } else if (rc != GNUTLS_E_SUCCESS) {
390 crm_err("TLS handshake with remote client failed: %s "
391 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
392 return EPROTO;
393 }
394 return pcmk_rc_ok;
395 }
396
397
398 static int
399 send_tls(gnutls_session_t *session, struct iovec *iov)
400 {
401 const char *unsent = iov->iov_base;
402 size_t unsent_len = iov->iov_len;
403 ssize_t gnutls_rc;
404
405 if (unsent == NULL) {
406 return EINVAL;
407 }
408
409 crm_trace("Sending TLS message of %llu bytes",
410 (unsigned long long) unsent_len);
411 while (true) {
412 gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
413
414 if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
415 crm_trace("Retrying to send %llu bytes remaining",
416 (unsigned long long) unsent_len);
417
418 } else if (gnutls_rc < 0) {
419
420 crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
421 gnutls_strerror((int) gnutls_rc),
422 (long long) gnutls_rc);
423 return ECONNABORTED;
424
425 } else if (gnutls_rc < unsent_len) {
426 crm_trace("Sent %lld of %llu bytes remaining",
427 (long long) gnutls_rc, (unsigned long long) unsent_len);
428 unsent_len -= gnutls_rc;
429 unsent += gnutls_rc;
430 } else {
431 crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
432 break;
433 }
434 }
435 return pcmk_rc_ok;
436 }
437 #endif
438
439
440 static int
441 send_plaintext(int sock, struct iovec *iov)
442 {
443 const char *unsent = iov->iov_base;
444 size_t unsent_len = iov->iov_len;
445 ssize_t write_rc;
446
447 if (unsent == NULL) {
448 return EINVAL;
449 }
450
451 crm_debug("Sending plaintext message of %llu bytes to socket %d",
452 (unsigned long long) unsent_len, sock);
453 while (true) {
454 write_rc = write(sock, unsent, unsent_len);
455 if (write_rc < 0) {
456 int rc = errno;
457
458 if ((errno == EINTR) || (errno == EAGAIN)) {
459 crm_trace("Retrying to send %llu bytes remaining to socket %d",
460 (unsigned long long) unsent_len, sock);
461 continue;
462 }
463
464
465 crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
466 pcmk_rc_str(rc), rc, sock);
467 return rc;
468
469 } else if (write_rc < unsent_len) {
470 crm_trace("Sent %lld of %llu bytes remaining",
471 (long long) write_rc, (unsigned long long) unsent_len);
472 unsent += write_rc;
473 unsent_len -= write_rc;
474 continue;
475
476 } else {
477 crm_trace("Sent all %lld bytes remaining: %.100s",
478 (long long) write_rc, (char *) (iov->iov_base));
479 break;
480 }
481 }
482 return pcmk_rc_ok;
483 }
484
485
486 static int
487 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
488 {
489 int rc = pcmk_rc_ok;
490
491 for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
492 #ifdef HAVE_GNUTLS_GNUTLS_H
493 if (remote->tls_session) {
494 rc = send_tls(remote->tls_session, &(iov[lpc]));
495 continue;
496 }
497 #endif
498 if (remote->tcp_socket) {
499 rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
500 } else {
501 rc = ESOCKTNOSUPPORT;
502 }
503 }
504 return rc;
505 }
506
507
508
509
510
511
512
513
514
515
516 int
517 pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
518 {
519 int rc = pcmk_rc_ok;
520 static uint64_t id = 0;
521 GString *xml_text = NULL;
522
523 struct iovec iov[2];
524 struct remote_header_v0 *header;
525
526 CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
527
528 xml_text = g_string_sized_new(1024);
529 pcmk__xml_string(msg, 0, xml_text, 0);
530 CRM_CHECK(xml_text->len > 0,
531 g_string_free(xml_text, TRUE); return EINVAL);
532
533 header = pcmk__assert_alloc(1, sizeof(struct remote_header_v0));
534
535 iov[0].iov_base = header;
536 iov[0].iov_len = sizeof(struct remote_header_v0);
537
538 iov[1].iov_len = 1 + xml_text->len;
539 iov[1].iov_base = g_string_free(xml_text, FALSE);
540
541 id++;
542 header->id = id;
543 header->endian = ENDIAN_LOCAL;
544 header->version = REMOTE_MSG_VERSION;
545 header->payload_offset = iov[0].iov_len;
546 header->payload_uncompressed = iov[1].iov_len;
547 header->size_total = iov[0].iov_len + iov[1].iov_len;
548
549 rc = remote_send_iovs(remote, iov, 2);
550 if (rc != pcmk_rc_ok) {
551 crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
552 pcmk_rc_str(rc), rc);
553 }
554
555 free(iov[0].iov_base);
556 g_free((gchar *) iov[1].iov_base);
557 return rc;
558 }
559
560
561
562
563
564
565
566
567
568
569 xmlNode *
570 pcmk__remote_message_xml(pcmk__remote_t *remote)
571 {
572 xmlNode *xml = NULL;
573 struct remote_header_v0 *header = localized_remote_header(remote);
574
575 if (header == NULL) {
576 return NULL;
577 }
578
579
580 if (header->payload_compressed) {
581 int rc = 0;
582 unsigned int size_u = 1 + header->payload_uncompressed;
583 char *uncompressed =
584 pcmk__assert_alloc(1, header->payload_offset + size_u);
585
586 crm_trace("Decompressing message data %d bytes into %d bytes",
587 header->payload_compressed, size_u);
588
589 rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
590 remote->buffer + header->payload_offset,
591 header->payload_compressed, 1, 0);
592 rc = pcmk__bzlib2rc(rc);
593
594 if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
595 crm_warn("Couldn't decompress v%d message, we only understand v%d",
596 header->version, REMOTE_MSG_VERSION);
597 free(uncompressed);
598 return NULL;
599
600 } else if (rc != pcmk_rc_ok) {
601 crm_err("Decompression failed: %s " CRM_XS " rc=%d",
602 pcmk_rc_str(rc), rc);
603 free(uncompressed);
604 return NULL;
605 }
606
607 pcmk__assert(size_u == header->payload_uncompressed);
608
609 memcpy(uncompressed, remote->buffer, header->payload_offset);
610 remote->buffer_size = header->payload_offset + size_u;
611
612 free(remote->buffer);
613 remote->buffer = uncompressed;
614 header = localized_remote_header(remote);
615 }
616
617
618 remote->buffer_offset = 0;
619
620 CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
621
622 xml = pcmk__xml_parse(remote->buffer + header->payload_offset);
623 if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
624 crm_warn("Couldn't parse v%d message, we only understand v%d",
625 header->version, REMOTE_MSG_VERSION);
626
627 } else if (xml == NULL) {
628 crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
629 }
630
631 crm_log_xml_trace(xml, "[remote msg]");
632 return xml;
633 }
634
635 static int
636 get_remote_socket(const pcmk__remote_t *remote)
637 {
638 #ifdef HAVE_GNUTLS_GNUTLS_H
639 if (remote->tls_session) {
640 void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
641
642 return GPOINTER_TO_INT(sock_ptr);
643 }
644 #endif
645
646 if (remote->tcp_socket) {
647 return remote->tcp_socket;
648 }
649
650 crm_err("Remote connection type undetermined (bug?)");
651 return -1;
652 }
653
654
655
656
657
658
659
660
661
662
663
664
665 int
666 pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
667 {
668 struct pollfd fds = { 0, };
669 int sock = 0;
670 int rc = 0;
671 time_t start;
672 int timeout = timeout_ms;
673
674 sock = get_remote_socket(remote);
675 if (sock <= 0) {
676 crm_trace("No longer connected");
677 return ENOTCONN;
678 }
679
680 start = time(NULL);
681 errno = 0;
682 do {
683 fds.fd = sock;
684 fds.events = POLLIN;
685
686
687
688
689 if (errno == EINTR && (timeout > 0)) {
690 timeout = timeout_ms - ((time(NULL) - start) * 1000);
691 if (timeout < 1000) {
692 timeout = 1000;
693 }
694 }
695
696 rc = poll(&fds, 1, timeout);
697 } while (rc < 0 && errno == EINTR);
698
699 if (rc < 0) {
700 return errno;
701 }
702 return (rc == 0)? ETIME : pcmk_rc_ok;
703 }
704
705
706
707
708
709
710
711
712
713
714
715
716
717 int
718 pcmk__read_available_remote_data(pcmk__remote_t *remote)
719 {
720 int rc = pcmk_rc_ok;
721 size_t read_len = sizeof(struct remote_header_v0);
722 struct remote_header_v0 *header = localized_remote_header(remote);
723 bool received = false;
724 ssize_t read_rc;
725
726 if(header) {
727
728 read_len = header->size_total;
729 }
730
731
732 if(remote->buffer_size < read_len) {
733 remote->buffer_size = 2 * read_len;
734 crm_trace("Expanding buffer to %llu bytes",
735 (unsigned long long) remote->buffer_size);
736 remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
737 }
738
739 #ifdef HAVE_GNUTLS_GNUTLS_H
740 if (!received && remote->tls_session) {
741 read_rc = gnutls_record_recv(*(remote->tls_session),
742 remote->buffer + remote->buffer_offset,
743 remote->buffer_size - remote->buffer_offset);
744 if (read_rc == GNUTLS_E_INTERRUPTED) {
745 rc = EINTR;
746 } else if (read_rc == GNUTLS_E_AGAIN) {
747 rc = EAGAIN;
748 } else if (read_rc < 0) {
749 crm_debug("TLS receive failed: %s (%lld)",
750 gnutls_strerror(read_rc), (long long) read_rc);
751 rc = EIO;
752 }
753 received = true;
754 }
755 #endif
756
757 if (!received && remote->tcp_socket) {
758 read_rc = read(remote->tcp_socket,
759 remote->buffer + remote->buffer_offset,
760 remote->buffer_size - remote->buffer_offset);
761 if (read_rc < 0) {
762 rc = errno;
763 }
764 received = true;
765 }
766
767 if (!received) {
768 crm_err("Remote connection type undetermined (bug?)");
769 return ESOCKTNOSUPPORT;
770 }
771
772
773 if (read_rc > 0) {
774 remote->buffer_offset += read_rc;
775
776 remote->buffer[remote->buffer_offset] = '\0';
777 crm_trace("Received %lld more bytes (%llu total)",
778 (long long) read_rc,
779 (unsigned long long) remote->buffer_offset);
780
781 } else if ((rc == EINTR) || (rc == EAGAIN)) {
782 crm_trace("No data available for non-blocking remote read: %s (%d)",
783 pcmk_rc_str(rc), rc);
784
785 } else if (read_rc == 0) {
786 crm_debug("End of remote data encountered after %llu bytes",
787 (unsigned long long) remote->buffer_offset);
788 return ENOTCONN;
789
790 } else {
791 crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
792 (unsigned long long) remote->buffer_offset,
793 pcmk_rc_str(rc), rc);
794 return ENOTCONN;
795 }
796
797 header = localized_remote_header(remote);
798 if(header) {
799 if(remote->buffer_offset < header->size_total) {
800 crm_trace("Read partial remote message (%llu of %u bytes)",
801 (unsigned long long) remote->buffer_offset,
802 header->size_total);
803 } else {
804 crm_trace("Read full remote message of %llu bytes",
805 (unsigned long long) remote->buffer_offset);
806 return pcmk_rc_ok;
807 }
808 }
809
810 return EAGAIN;
811 }
812
813
814
815
816
817
818
819
820
821
822
823 int
824 pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
825 {
826 int rc = pcmk_rc_ok;
827 time_t start = time(NULL);
828 int remaining_timeout = 0;
829
830 if (timeout_ms == 0) {
831 timeout_ms = 10000;
832 } else if (timeout_ms < 0) {
833 timeout_ms = 60000;
834 }
835
836 remaining_timeout = timeout_ms;
837 while (remaining_timeout > 0) {
838
839 crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
840 remaining_timeout, timeout_ms);
841 rc = pcmk__remote_ready(remote, remaining_timeout);
842
843 if (rc == ETIME) {
844 crm_err("Timed out (%d ms) while waiting for remote data",
845 remaining_timeout);
846 return rc;
847
848 } else if (rc != pcmk_rc_ok) {
849 crm_debug("Wait for remote data aborted (will retry): %s "
850 CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
851
852 } else {
853 rc = pcmk__read_available_remote_data(remote);
854 if (rc == pcmk_rc_ok) {
855 return rc;
856 } else if (rc == EAGAIN) {
857 crm_trace("Waiting for more remote data");
858 } else {
859 crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
860 pcmk_rc_str(rc), rc);
861 }
862 }
863
864
865 if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
866 return rc;
867 }
868
869 remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
870 }
871 return ETIME;
872 }
873
874 struct tcp_async_cb_data {
875 int sock;
876 int timeout_ms;
877 time_t start;
878 void *userdata;
879 void (*callback) (void *userdata, int rc, int sock);
880 };
881
882
883 static gboolean
884 check_connect_finished(gpointer userdata)
885 {
886 struct tcp_async_cb_data *cb_data = userdata;
887 int rc;
888
889 fd_set rset, wset;
890 struct timeval ts = { 0, };
891
892 if (cb_data->start == 0) {
893
894 rc = pcmk_rc_ok;
895 goto dispatch_done;
896 }
897
898
899 FD_ZERO(&rset);
900 FD_SET(cb_data->sock, &rset);
901 wset = rset;
902 rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
903
904 if (rc < 0) {
905 rc = errno;
906 if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
907 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
908 return TRUE;
909 } else {
910 rc = ETIMEDOUT;
911 }
912 }
913 crm_trace("Could not check socket %d for connection success: %s (%d)",
914 cb_data->sock, pcmk_rc_str(rc), rc);
915
916 } else if (rc == 0) {
917 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
918 return TRUE;
919 }
920 crm_debug("Timed out while waiting for socket %d connection success",
921 cb_data->sock);
922 rc = ETIMEDOUT;
923
924
925
926 } else if (FD_ISSET(cb_data->sock, &rset)
927 || FD_ISSET(cb_data->sock, &wset)) {
928
929
930 int error = 0;
931 socklen_t len = sizeof(error);
932
933 if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
934 rc = errno;
935 crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
936 cb_data->sock, pcmk_rc_str(rc), rc);
937 } else if (error != 0) {
938 rc = error;
939 crm_trace("Socket %d connected with error: %s (%d)",
940 cb_data->sock, pcmk_rc_str(rc), rc);
941 } else {
942 rc = pcmk_rc_ok;
943 }
944
945 } else {
946 crm_trace("select() succeeded, but socket %d not in resulting "
947 "read/write sets", cb_data->sock);
948 rc = EAGAIN;
949 }
950
951 dispatch_done:
952 if (rc == pcmk_rc_ok) {
953 crm_trace("Socket %d is connected", cb_data->sock);
954 } else {
955 close(cb_data->sock);
956 cb_data->sock = -1;
957 }
958
959 if (cb_data->callback) {
960 cb_data->callback(cb_data->userdata, rc, cb_data->sock);
961 }
962 free(cb_data);
963 return FALSE;
964 }
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984 static int
985 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
986 int timeout_ms, int *timer_id, void *userdata,
987 void (*callback) (void *userdata, int rc, int sock))
988 {
989 int rc = 0;
990 int interval = 500;
991 int timer;
992 struct tcp_async_cb_data *cb_data = NULL;
993
994 rc = pcmk__set_nonblocking(sock);
995 if (rc != pcmk_rc_ok) {
996 crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
997 pcmk_rc_str(rc), rc);
998 return rc;
999 }
1000
1001 rc = connect(sock, addr, addrlen);
1002 if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
1003 rc = errno;
1004 crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1005 pcmk_rc_str(rc), rc);
1006 return rc;
1007 }
1008
1009 cb_data = pcmk__assert_alloc(1, sizeof(struct tcp_async_cb_data));
1010 cb_data->userdata = userdata;
1011 cb_data->callback = callback;
1012 cb_data->sock = sock;
1013 cb_data->timeout_ms = timeout_ms;
1014
1015 if (rc == 0) {
1016
1017
1018
1019
1020 cb_data->start = 0;
1021 interval = 1;
1022 } else {
1023 cb_data->start = time(NULL);
1024 }
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035 crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1036 interval, sock);
1037 timer = g_timeout_add(interval, check_connect_finished, cb_data);
1038 if (timer_id) {
1039 *timer_id = timer;
1040 }
1041
1042
1043
1044 return pcmk_rc_ok;
1045 }
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057 static int
1058 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
1059 {
1060 int rc = connect(sock, addr, addrlen);
1061
1062 if (rc < 0) {
1063 rc = errno;
1064 crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1065 pcmk_rc_str(rc), rc);
1066 return rc;
1067 }
1068
1069 rc = pcmk__set_nonblocking(sock);
1070 if (rc != pcmk_rc_ok) {
1071 crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1072 pcmk_rc_str(rc), rc);
1073 return rc;
1074 }
1075
1076 return pcmk_ok;
1077 }
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095 int
1096 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
1097 int *sock_fd, void *userdata,
1098 void (*callback) (void *userdata, int rc, int sock))
1099 {
1100 char buffer[INET6_ADDRSTRLEN];
1101 struct addrinfo *res = NULL;
1102 struct addrinfo *rp = NULL;
1103 struct addrinfo hints;
1104 const char *server = host;
1105 int rc;
1106 int sock = -1;
1107
1108 CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
1109
1110
1111 memset(&hints, 0, sizeof(struct addrinfo));
1112 hints.ai_family = AF_UNSPEC;
1113 hints.ai_socktype = SOCK_STREAM;
1114 hints.ai_flags = AI_CANONNAME;
1115
1116 rc = getaddrinfo(server, NULL, &hints, &res);
1117 rc = pcmk__gaierror2rc(rc);
1118
1119 if (rc != pcmk_rc_ok) {
1120 crm_err("Unable to get IP address info for %s: %s",
1121 server, pcmk_rc_str(rc));
1122 goto async_cleanup;
1123 }
1124
1125 if (!res || !res->ai_addr) {
1126 crm_err("Unable to get IP address info for %s: no result", server);
1127 rc = ENOTCONN;
1128 goto async_cleanup;
1129 }
1130
1131
1132 for (rp = res; rp != NULL; rp = rp->ai_next) {
1133 struct sockaddr *addr = rp->ai_addr;
1134
1135 if (!addr) {
1136 continue;
1137 }
1138
1139 if (rp->ai_canonname) {
1140 server = res->ai_canonname;
1141 }
1142 crm_debug("Got canonical name %s for %s", server, host);
1143
1144 sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1145 if (sock == -1) {
1146 rc = errno;
1147 crm_warn("Could not create socket for remote connection to %s:%d: "
1148 "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
1149 continue;
1150 }
1151
1152
1153
1154 if (addr->sa_family == AF_INET6) {
1155 ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1156 } else {
1157 ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1158 }
1159
1160 memset(buffer, 0, PCMK__NELEM(buffer));
1161 pcmk__sockaddr2str(addr, buffer);
1162 crm_info("Attempting remote connection to %s:%d", buffer, port);
1163
1164 if (callback) {
1165 if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
1166 timer_id, userdata, callback) == pcmk_rc_ok) {
1167 goto async_cleanup;
1168 }
1169
1170 } else if (connect_socket_once(sock, rp->ai_addr,
1171 rp->ai_addrlen) == pcmk_rc_ok) {
1172 break;
1173 }
1174
1175
1176 close(sock);
1177 sock = -1;
1178 rc = ENOTCONN;
1179 }
1180
1181 async_cleanup:
1182
1183 if (res) {
1184 freeaddrinfo(res);
1185 }
1186 *sock_fd = sock;
1187 return rc;
1188 }
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201 void
1202 pcmk__sockaddr2str(const void *sa, char *s)
1203 {
1204 switch (((const struct sockaddr *) sa)->sa_family) {
1205 case AF_INET:
1206 inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
1207 s, INET6_ADDRSTRLEN);
1208 break;
1209
1210 case AF_INET6:
1211 inet_ntop(AF_INET6,
1212 &(((const struct sockaddr_in6 *) sa)->sin6_addr),
1213 s, INET6_ADDRSTRLEN);
1214 break;
1215
1216 default:
1217 strcpy(s, "<invalid>");
1218 }
1219 }
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230 int
1231 pcmk__accept_remote_connection(int ssock, int *csock)
1232 {
1233 int rc;
1234 struct sockaddr_storage addr;
1235 socklen_t laddr = sizeof(addr);
1236 char addr_str[INET6_ADDRSTRLEN];
1237 #ifdef TCP_USER_TIMEOUT
1238 long sbd_timeout = 0;
1239 #endif
1240
1241
1242 memset(&addr, 0, sizeof(addr));
1243 *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1244 if (*csock == -1) {
1245 rc = errno;
1246 crm_err("Could not accept remote client connection: %s "
1247 CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
1248 return rc;
1249 }
1250 pcmk__sockaddr2str(&addr, addr_str);
1251 crm_info("Accepted new remote client connection from %s", addr_str);
1252
1253 rc = pcmk__set_nonblocking(*csock);
1254 if (rc != pcmk_rc_ok) {
1255 crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1256 pcmk_rc_str(rc), rc);
1257 close(*csock);
1258 *csock = -1;
1259 return rc;
1260 }
1261
1262 #ifdef TCP_USER_TIMEOUT
1263 sbd_timeout = pcmk__get_sbd_watchdog_timeout();
1264 if (sbd_timeout > 0) {
1265
1266 long half = sbd_timeout / 2;
1267 unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
1268
1269 rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1270 &optval, sizeof(optval));
1271 if (rc < 0) {
1272 rc = errno;
1273 crm_err("Could not set TCP timeout to %d ms on remote connection: "
1274 "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
1275 close(*csock);
1276 *csock = -1;
1277 return rc;
1278 }
1279 }
1280 #endif
1281
1282 return rc;
1283 }
1284
1285
1286
1287
1288
1289
1290 int
1291 crm_default_remote_port(void)
1292 {
1293 static int port = 0;
1294
1295 if (port == 0) {
1296 const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
1297
1298 if (env) {
1299 errno = 0;
1300 port = strtol(env, NULL, 10);
1301 if (errno || (port < 1) || (port > 65535)) {
1302 crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
1303 " has invalid value '%s', using %d instead",
1304 env, DEFAULT_REMOTE_PORT);
1305 port = DEFAULT_REMOTE_PORT;
1306 }
1307 } else {
1308 port = DEFAULT_REMOTE_PORT;
1309 }
1310 }
1311 return port;
1312 }