This source file includes following definitions.
- pcmk__cpg_local_nodeid
- crm_cs_flush_cb
- crm_cs_flush
- pcmk_cpg_dispatch
- ais_dest
- msg_type2text
- check_message_sanity
- pcmk__cpg_message_data
- cmp_member_list_nodeid
- cpgreason2str
- peer_name
- node_left
- pcmk__cpg_confchg_cb
- pcmk_cpg_set_deliver_fn
- pcmk_cpg_set_confchg_fn
- pcmk__cpg_connect
- pcmk__cpg_disconnect
- send_cpg_text
- pcmk__cpg_send_xml
1
2
3
4
5
6
7
8
9
10 #include <crm_internal.h>
11
12 #include <arpa/inet.h>
13 #include <inttypes.h>
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdbool.h>
17 #include <stdint.h>
18 #include <sys/socket.h>
19 #include <sys/types.h>
20 #include <sys/utsname.h>
21
22 #include <bzlib.h>
23 #include <corosync/corodefs.h>
24 #include <corosync/corotypes.h>
25 #include <corosync/hdb.h>
26 #include <corosync/cpg.h>
27 #include <qb/qbipc_common.h>
28 #include <qb/qbipcc.h>
29 #include <qb/qbutil.h>
30
31 #include <crm/cluster/internal.h>
32 #include <crm/common/ipc.h>
33 #include <crm/common/ipc_internal.h>
34 #include <crm/common/mainloop.h>
35 #include <crm/common/xml.h>
36
37 #include "crmcluster_private.h"
38
39
40
41
42 static cpg_handle_t pcmk_cpg_handle = 0;
43
44
45 static bool cpg_evicted = false;
46 static GList *cs_message_queue = NULL;
47 static int cs_message_timer = 0;
48
49
50
51
52
53
54 struct pcmk__cpg_host_s {
55 uint32_t id;
56 uint32_t pid;
57 gboolean local;
58 enum pcmk_ipc_server type;
59 uint32_t size;
60 char uname[MAX_NAME];
61 } __attribute__ ((packed));
62
63 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
64
65 struct pcmk__cpg_msg_s {
66 struct qb_ipc_response_header header __attribute__ ((aligned(8)));
67 uint32_t id;
68 gboolean is_compressed;
69
70 pcmk__cpg_host_t host;
71 pcmk__cpg_host_t sender;
72
73 uint32_t size;
74 uint32_t compressed_size;
75
76 char data[0];
77
78 } __attribute__ ((packed));
79
80 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
81
82 static void crm_cs_flush(gpointer data);
83
84 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
85
86 #define cs_repeat(rc, counter, max, code) do { \
87 rc = code; \
88 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
89 counter++; \
90 crm_debug("Retrying operation after %ds", counter); \
91 sleep(counter); \
92 } else { \
93 break; \
94 } \
95 } while (counter < max)
96
97
98
99
100
101
102
103
104
105 uint32_t
106 pcmk__cpg_local_nodeid(cpg_handle_t handle)
107 {
108 cs_error_t rc = CS_OK;
109 int retries = 0;
110 static uint32_t local_nodeid = 0;
111 cpg_handle_t local_handle = handle;
112 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
113 int fd = -1;
114 uid_t found_uid = 0;
115 gid_t found_gid = 0;
116 pid_t found_pid = 0;
117 int rv = 0;
118
119 if (local_nodeid != 0) {
120 return local_nodeid;
121 }
122
123 if (handle == 0) {
124 crm_trace("Creating connection");
125 cs_repeat(rc, retries, 5,
126 cpg_model_initialize(&local_handle, CPG_MODEL_V1,
127 (cpg_model_data_t *) &cpg_model_info,
128 NULL));
129 if (rc != CS_OK) {
130 crm_err("Could not connect to the CPG API: %s (%d)",
131 cs_strerror(rc), rc);
132 return 0;
133 }
134
135 rc = cpg_fd_get(local_handle, &fd);
136 if (rc != CS_OK) {
137 crm_err("Could not obtain the CPG API connection: %s (%d)",
138 cs_strerror(rc), rc);
139 goto bail;
140 }
141
142
143 rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
144 &found_uid, &found_gid);
145 if (rv == 0) {
146 crm_err("CPG provider is not authentic:"
147 " process %lld (uid: %lld, gid: %lld)",
148 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
149 (long long) found_uid, (long long) found_gid);
150 goto bail;
151
152 } else if (rv < 0) {
153 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
154 strerror(-rv), -rv);
155 goto bail;
156 }
157 }
158
159 if (rc == CS_OK) {
160 retries = 0;
161 crm_trace("Performing lookup");
162 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
163 }
164
165 if (rc != CS_OK) {
166 crm_err("Could not get local node id from the CPG API: %s (%d)",
167 pcmk__cs_err_str(rc), rc);
168 }
169
170 bail:
171 if (handle == 0) {
172 crm_trace("Closing connection");
173 cpg_finalize(local_handle);
174 }
175 crm_debug("Local nodeid is %u", local_nodeid);
176 return local_nodeid;
177 }
178
179
180
181
182
183
184
185
186
187 static gboolean
188 crm_cs_flush_cb(gpointer data)
189 {
190 cs_message_timer = 0;
191 crm_cs_flush(data);
192 return FALSE;
193 }
194
195
196 #define CS_SEND_MAX 200
197
198
199
200
201
202
203
204 static void
205 crm_cs_flush(gpointer data)
206 {
207 unsigned int sent = 0;
208 guint queue_len = 0;
209 cs_error_t rc = 0;
210 cpg_handle_t *handle = (cpg_handle_t *) data;
211
212 if (*handle == 0) {
213 crm_trace("Connection is dead");
214 return;
215 }
216
217 queue_len = g_list_length(cs_message_queue);
218 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
219 crm_err("CPG queue has grown to %d", queue_len);
220
221 } else if (queue_len == CS_SEND_MAX) {
222 crm_warn("CPG queue has grown to %d", queue_len);
223 }
224
225 if (cs_message_timer != 0) {
226
227 crm_trace("Timer active %d", cs_message_timer);
228 return;
229 }
230
231 while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
232 struct iovec *iov = cs_message_queue->data;
233
234 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
235 if (rc != CS_OK) {
236 break;
237 }
238
239 sent++;
240 crm_trace("CPG message sent, size=%llu",
241 (unsigned long long) iov->iov_len);
242
243 cs_message_queue = g_list_remove(cs_message_queue, iov);
244 free(iov->iov_base);
245 free(iov);
246 }
247
248 queue_len -= sent;
249 do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
250 "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
251 sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
252 (int) rc);
253
254 if (cs_message_queue) {
255 uint32_t delay_ms = 100;
256 if (rc != CS_OK) {
257
258 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
259 }
260 cs_message_timer = pcmk__create_timer(delay_ms, crm_cs_flush_cb, data);
261 }
262 }
263
264
265
266
267
268
269
270
271
272 static int
273 pcmk_cpg_dispatch(gpointer user_data)
274 {
275 cs_error_t rc = CS_OK;
276 pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
277
278 rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE);
279 if (rc != CS_OK) {
280 crm_err("Connection to the CPG API failed: %s (%d)",
281 pcmk__cs_err_str(rc), rc);
282 cpg_finalize(cluster->priv->cpg_handle);
283 cluster->priv->cpg_handle = 0;
284 return -1;
285
286 } else if (cpg_evicted) {
287 crm_err("Evicted from CPG membership");
288 return -1;
289 }
290 return 0;
291 }
292
293 static inline const char *
294 ais_dest(const pcmk__cpg_host_t *host)
295 {
296 return (host->size > 0)? host->uname : "<all>";
297 }
298
299 static inline const char *
300 msg_type2text(enum pcmk_ipc_server type)
301 {
302 const char *name = pcmk__server_message_type(type);
303
304 return pcmk__s(name, "unknown");
305 }
306
307
308
309
310
311
312
313
314
315 static bool
316 check_message_sanity(const pcmk__cpg_msg_t *msg)
317 {
318 int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
319
320 if (payload_size < 1) {
321 crm_err("%sCPG message %d from %s invalid: "
322 "Claimed size of %d bytes is too small "
323 QB_XS " from %s[%u] to %s@%s",
324 (msg->is_compressed? "Compressed " : ""),
325 msg->id, ais_dest(&(msg->sender)),
326 (int) msg->header.size,
327 msg_type2text(msg->sender.type), msg->sender.pid,
328 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
329 return false;
330 }
331
332 if (msg->header.error != CS_OK) {
333 crm_err("%sCPG message %d from %s invalid: "
334 "Sender indicated error %d "
335 QB_XS " from %s[%u] to %s@%s",
336 (msg->is_compressed? "Compressed " : ""),
337 msg->id, ais_dest(&(msg->sender)),
338 msg->header.error,
339 msg_type2text(msg->sender.type), msg->sender.pid,
340 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
341 return false;
342 }
343
344 if (msg_data_len(msg) != payload_size) {
345 crm_err("%sCPG message %d from %s invalid: "
346 "Total size %d inconsistent with payload size %d "
347 QB_XS " from %s[%u] to %s@%s",
348 (msg->is_compressed? "Compressed " : ""),
349 msg->id, ais_dest(&(msg->sender)),
350 (int) msg->header.size, (int) msg_data_len(msg),
351 msg_type2text(msg->sender.type), msg->sender.pid,
352 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
353 return false;
354 }
355
356 if (!msg->is_compressed &&
357
358
359
360 (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
361 || (msg->data[msg->size - 1] != '\0'))) {
362 crm_err("CPG message %d from %s invalid: "
363 "Payload does not end at byte %llu "
364 QB_XS " from %s[%u] to %s@%s",
365 msg->id, ais_dest(&(msg->sender)),
366 (unsigned long long) msg->size,
367 msg_type2text(msg->sender.type), msg->sender.pid,
368 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
369 return false;
370 }
371
372 crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
373 (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
374 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
375 ais_dest(&(msg->sender)),
376 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
377 return true;
378 }
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396 char *
397 pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
398 void *content, const char **from)
399 {
400 char *data = NULL;
401 pcmk__cpg_msg_t *msg = content;
402
403 if (from != NULL) {
404 *from = NULL;
405 }
406
407 if (handle != 0) {
408 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
409 const char *local_name = pcmk__cluster_local_node_name();
410
411
412 if (msg->sender.id == 0) {
413 msg->sender.id = sender_id;
414 } else if (msg->sender.id != sender_id) {
415 crm_warn("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
416 ": claimed ID %" PRIu32,
417 sender_id, pid, msg->sender.id);
418 return NULL;
419 }
420
421
422 if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
423 crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
424 ": for ID %" PRIu32 " not %" PRIu32,
425 sender_id, pid, msg->host.id, local_nodeid);
426 return NULL;
427 }
428 if ((msg->host.size > 0)
429 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
430
431 crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
432 ": for name %s not %s",
433 sender_id, pid, msg->host.uname, local_name);
434 return NULL;
435 }
436
437
438 if (msg->sender.size == 0) {
439 const pcmk__node_status_t *peer =
440 pcmk__get_node(sender_id, NULL, NULL,
441 pcmk__node_search_cluster_member);
442
443 if (peer->name == NULL) {
444 crm_debug("Received CPG message from node with ID %" PRIu32
445 " but its name is unknown", sender_id);
446 } else {
447 crm_debug("Updating name of CPG message sender with ID %" PRIu32
448 " to %s", sender_id, peer->name);
449 msg->sender.size = strlen(peer->name);
450 memset(msg->sender.uname, 0, MAX_NAME);
451 memcpy(msg->sender.uname, peer->name, msg->sender.size);
452 }
453 }
454 }
455
456
457 pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
458 pcmk__node_search_cluster_member);
459
460 if (from != NULL) {
461 *from = msg->sender.uname;
462 }
463
464 if (!check_message_sanity(msg)) {
465 return NULL;
466 }
467
468 if (msg->is_compressed && (msg->size > 0)) {
469 int rc = BZ_OK;
470 unsigned int new_size = msg->size + 1;
471 char *uncompressed = pcmk__assert_alloc(1, new_size);
472
473 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
474 msg->compressed_size, 1, 0);
475 rc = pcmk__bzlib2rc(rc);
476 if ((rc == pcmk_rc_ok) && (msg->size != new_size)) {
477 rc = pcmk_rc_compression;
478 }
479 if (rc != pcmk_rc_ok) {
480 free(uncompressed);
481 crm_warn("Ignoring compressed CPG message %d from %s (ID %" PRIu32
482 " PID %" PRIu32 "): %s",
483 msg->id, ais_dest(&(msg->sender)), sender_id, pid,
484 pcmk_rc_str(rc));
485 return NULL;
486 }
487 data = uncompressed;
488
489 } else {
490 data = pcmk__str_copy(msg->data);
491 }
492
493 crm_trace("Received %sCPG message %d from %s (ID %" PRIu32
494 " PID %" PRIu32 "): %.40s...",
495 (msg->is_compressed? "compressed " : ""),
496 msg->id, ais_dest(&(msg->sender)), sender_id, pid, msg->data);
497 return data;
498 }
499
500
501
502
503
504
505
506
507
508
509
510
511 static int
512 cmp_member_list_nodeid(const void *first, const void *second)
513 {
514 const struct cpg_address *const a = *((const struct cpg_address **) first),
515 *const b = *((const struct cpg_address **) second);
516 if (a->nodeid < b->nodeid) {
517 return -1;
518 } else if (a->nodeid > b->nodeid) {
519 return 1;
520 }
521
522 return 0;
523 }
524
525
526
527
528
529
530
531
532
533 static const char *
534 cpgreason2str(cpg_reason_t reason)
535 {
536 switch (reason) {
537 case CPG_REASON_JOIN: return " via cpg_join";
538 case CPG_REASON_LEAVE: return " via cpg_leave";
539 case CPG_REASON_NODEDOWN: return " via cluster exit";
540 case CPG_REASON_NODEUP: return " via cluster join";
541 case CPG_REASON_PROCDOWN: return " for unknown reason";
542 default: break;
543 }
544 return "";
545 }
546
547
548
549
550
551
552
553
554
555 static inline const char *
556 peer_name(const pcmk__node_status_t *peer)
557 {
558 return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node";
559 }
560
561
562
563
564
565
566
567
568
569
570
571
572 static void
573 node_left(const char *cpg_group_name, int event_counter,
574 uint32_t local_nodeid, const struct cpg_address *cpg_peer,
575 const struct cpg_address **sorted_member_list,
576 size_t member_list_entries)
577 {
578 pcmk__node_status_t *peer =
579 pcmk__search_node_caches(cpg_peer->nodeid, NULL,
580 pcmk__node_search_cluster_member);
581 const struct cpg_address **rival = NULL;
582
583
584
585
586
587
588
589
590
591
592
593
594
595 if (peer != NULL) {
596 rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
597 sizeof(const struct cpg_address *),
598 cmp_member_list_nodeid);
599 }
600
601 if (rival == NULL) {
602 crm_info("Group %s event %d: %s (node %u pid %u) left%s",
603 cpg_group_name, event_counter, peer_name(peer),
604 cpg_peer->nodeid, cpg_peer->pid,
605 cpgreason2str(cpg_peer->reason));
606 if (peer != NULL) {
607 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
608 PCMK_VALUE_OFFLINE);
609 }
610 } else if (cpg_peer->nodeid == local_nodeid) {
611 crm_warn("Group %s event %d: duplicate local pid %u left%s",
612 cpg_group_name, event_counter,
613 cpg_peer->pid, cpgreason2str(cpg_peer->reason));
614 } else {
615 crm_warn("Group %s event %d: "
616 "%s (node %u) duplicate pid %u left%s (%u remains)",
617 cpg_group_name, event_counter, peer_name(peer),
618 cpg_peer->nodeid, cpg_peer->pid,
619 cpgreason2str(cpg_peer->reason), (*rival)->pid);
620 }
621 }
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639 void
640 pcmk__cpg_confchg_cb(cpg_handle_t handle,
641 const struct cpg_name *group_name,
642 const struct cpg_address *member_list,
643 size_t member_list_entries,
644 const struct cpg_address *left_list,
645 size_t left_list_entries,
646 const struct cpg_address *joined_list,
647 size_t joined_list_entries)
648 {
649 static int counter = 0;
650
651 bool found = false;
652 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
653 const struct cpg_address **sorted = NULL;
654
655 sorted = pcmk__assert_alloc(member_list_entries,
656 sizeof(const struct cpg_address *));
657
658 for (size_t iter = 0; iter < member_list_entries; iter++) {
659 sorted[iter] = member_list + iter;
660 }
661
662
663 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
664 cmp_member_list_nodeid);
665
666 for (int i = 0; i < left_list_entries; i++) {
667 node_left(group_name->value, counter, local_nodeid, &left_list[i],
668 sorted, member_list_entries);
669 }
670 free(sorted);
671 sorted = NULL;
672
673 for (int i = 0; i < joined_list_entries; i++) {
674 crm_info("Group %s event %d: node %u pid %u joined%s",
675 group_name->value, counter, joined_list[i].nodeid,
676 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
677 }
678
679 for (int i = 0; i < member_list_entries; i++) {
680 pcmk__node_status_t *peer =
681 pcmk__get_node(member_list[i].nodeid, NULL, NULL,
682 pcmk__node_search_cluster_member);
683
684 if (member_list[i].nodeid == local_nodeid
685 && member_list[i].pid != getpid()) {
686
687 crm_warn("Group %s event %d: detected duplicate local pid %u",
688 group_name->value, counter, member_list[i].pid);
689 continue;
690 }
691 crm_info("Group %s event %d: %s (node %u pid %u) is member",
692 group_name->value, counter, peer_name(peer),
693 member_list[i].nodeid, member_list[i].pid);
694
695
696
697
698 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
699 PCMK_VALUE_ONLINE);
700
701 if (peer && peer->state && strcmp(peer->state, PCMK_VALUE_MEMBER)) {
702
703
704
705
706
707
708
709 time_t now = time(NULL);
710
711 if (peer->when_lost == 0) {
712
713 peer->when_lost = now;
714
715 } else if (now > (peer->when_lost + 60)) {
716
717 crm_warn("Node %u is member of group %s but was believed "
718 "offline",
719 member_list[i].nodeid, group_name->value);
720 pcmk__update_peer_state(__func__, peer, PCMK_VALUE_MEMBER, 0);
721 }
722 }
723
724 if (local_nodeid == member_list[i].nodeid) {
725 found = true;
726 }
727 }
728
729 if (!found) {
730 crm_err("Local node was evicted from group %s", group_name->value);
731 cpg_evicted = true;
732 }
733
734 counter++;
735 }
736
737
738
739
740
741
742
743
744
745 int
746 pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
747 {
748 if (cluster == NULL) {
749 return EINVAL;
750 }
751 cluster->cpg.cpg_deliver_fn = fn;
752 return pcmk_rc_ok;
753 }
754
755
756
757
758
759
760
761
762
763 int
764 pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
765 {
766 if (cluster == NULL) {
767 return EINVAL;
768 }
769 cluster->cpg.cpg_confchg_fn = fn;
770 return pcmk_rc_ok;
771 }
772
773
774
775
776
777
778
779
780 int
781 pcmk__cpg_connect(pcmk_cluster_t *cluster)
782 {
783 cs_error_t rc;
784 int fd = -1;
785 int retries = 0;
786 uint32_t id = 0;
787 pcmk__node_status_t *peer = NULL;
788 cpg_handle_t handle = 0;
789 const char *cpg_group_name = NULL;
790 uid_t found_uid = 0;
791 gid_t found_gid = 0;
792 pid_t found_pid = 0;
793 int rv;
794
795 struct mainloop_fd_callbacks cpg_fd_callbacks = {
796 .dispatch = pcmk_cpg_dispatch,
797 .destroy = cluster->destroy,
798 };
799
800 cpg_model_v1_data_t cpg_model_info = {
801 .model = CPG_MODEL_V1,
802 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
803 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
804 .cpg_totem_confchg_fn = NULL,
805 .flags = 0,
806 };
807
808 cpg_evicted = false;
809
810 cpg_group_name = pcmk__server_message_type(cluster->priv->server);
811 if (cpg_group_name == NULL) {
812
813
814
815
816 cpg_group_name = pcmk__s(crm_system_name, "unknown");
817 }
818 memset(cluster->priv->group.value, 0, 128);
819 strncpy(cluster->priv->group.value, cpg_group_name, 127);
820 cluster->priv->group.length = strlen(cluster->priv->group.value) + 1;
821
822 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
823 if (rc != CS_OK) {
824 crm_err("Could not connect to the CPG API: %s (%d)",
825 cs_strerror(rc), rc);
826 goto bail;
827 }
828
829 rc = cpg_fd_get(handle, &fd);
830 if (rc != CS_OK) {
831 crm_err("Could not obtain the CPG API connection: %s (%d)",
832 cs_strerror(rc), rc);
833 goto bail;
834 }
835
836
837 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
838 &found_uid, &found_gid))) {
839 crm_err("CPG provider is not authentic:"
840 " process %lld (uid: %lld, gid: %lld)",
841 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
842 (long long) found_uid, (long long) found_gid);
843 rc = CS_ERR_ACCESS;
844 goto bail;
845 } else if (rv < 0) {
846 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
847 strerror(-rv), -rv);
848 rc = CS_ERR_ACCESS;
849 goto bail;
850 }
851
852 id = pcmk__cpg_local_nodeid(handle);
853 if (id == 0) {
854 crm_err("Could not get local node id from the CPG API");
855 goto bail;
856
857 }
858 cluster->priv->node_id = id;
859
860 retries = 0;
861 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group));
862 if (rc != CS_OK) {
863 crm_err("Could not join the CPG group '%s': %d", cpg_group_name, rc);
864 goto bail;
865 }
866
867 pcmk_cpg_handle = handle;
868 cluster->priv->cpg_handle = handle;
869 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
870
871 bail:
872 if (rc != CS_OK) {
873 cpg_finalize(handle);
874
875 return ENOTCONN;
876 }
877
878 peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
879 crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
880 return pcmk_rc_ok;
881 }
882
883
884
885
886
887
888
889 void
890 pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
891 {
892 pcmk_cpg_handle = 0;
893 if (cluster->priv->cpg_handle != 0) {
894 crm_trace("Disconnecting CPG");
895 cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group);
896 cpg_finalize(cluster->priv->cpg_handle);
897 cluster->priv->cpg_handle = 0;
898
899 } else {
900 crm_info("No CPG connection");
901 }
902 }
903
904
905
906
907
908
909
910
911
912
913
914 static bool
915 send_cpg_text(const char *data, const pcmk__node_status_t *node,
916 enum pcmk_ipc_server dest)
917 {
918 static int msg_id = 0;
919 static int local_pid = 0;
920 static int local_name_len = 0;
921 static const char *local_name = NULL;
922
923 char *target = NULL;
924 struct iovec *iov;
925 pcmk__cpg_msg_t *msg = NULL;
926
927 if (local_name == NULL) {
928 local_name = pcmk__cluster_local_node_name();
929 }
930 if ((local_name_len == 0) && (local_name != NULL)) {
931 local_name_len = strlen(local_name);
932 }
933
934 if (data == NULL) {
935 data = "";
936 }
937
938 if (local_pid == 0) {
939 local_pid = getpid();
940 }
941
942 msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
943
944 msg_id++;
945 msg->id = msg_id;
946 msg->header.error = CS_OK;
947
948 msg->host.type = dest;
949
950 if (node != NULL) {
951 if (node->name != NULL) {
952 target = pcmk__str_copy(node->name);
953 msg->host.size = strlen(node->name);
954 memset(msg->host.uname, 0, MAX_NAME);
955 memcpy(msg->host.uname, node->name, msg->host.size);
956
957 } else {
958 target = crm_strdup_printf("%" PRIu32, node->cluster_layer_id);
959 }
960 msg->host.id = node->cluster_layer_id;
961
962 } else {
963 target = pcmk__str_copy("all");
964 }
965
966 msg->sender.id = 0;
967 msg->sender.type = pcmk__parse_server(crm_system_name);
968 msg->sender.pid = local_pid;
969 msg->sender.size = local_name_len;
970 memset(msg->sender.uname, 0, MAX_NAME);
971
972 if ((local_name != NULL) && (msg->sender.size != 0)) {
973 memcpy(msg->sender.uname, local_name, msg->sender.size);
974 }
975
976 msg->size = 1 + strlen(data);
977 msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
978
979 if (msg->size < CRM_BZ2_THRESHOLD) {
980 msg = pcmk__realloc(msg, msg->header.size);
981 memcpy(msg->data, data, msg->size);
982
983 } else {
984 char *compressed = NULL;
985 unsigned int new_size = 0;
986
987 if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
988 &new_size) == pcmk_rc_ok) {
989
990 msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
991 msg = pcmk__realloc(msg, msg->header.size);
992 memcpy(msg->data, compressed, new_size);
993
994 msg->is_compressed = TRUE;
995 msg->compressed_size = new_size;
996
997 } else {
998
999
1000 msg = pcmk__realloc(msg, msg->header.size);
1001 memcpy(msg->data, data, msg->size);
1002 }
1003
1004 free(compressed);
1005 }
1006
1007 iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1008 iov->iov_base = msg;
1009 iov->iov_len = msg->header.size;
1010
1011 if (msg->compressed_size > 0) {
1012 crm_trace("Queueing CPG message %u to %s "
1013 "(%llu bytes, %d bytes compressed payload): %.200s",
1014 msg->id, target, (unsigned long long) iov->iov_len,
1015 msg->compressed_size, data);
1016 } else {
1017 crm_trace("Queueing CPG message %u to %s "
1018 "(%llu bytes, %d bytes payload): %.200s",
1019 msg->id, target, (unsigned long long) iov->iov_len,
1020 msg->size, data);
1021 }
1022
1023 free(target);
1024
1025 cs_message_queue = g_list_append(cs_message_queue, iov);
1026 crm_cs_flush(&pcmk_cpg_handle);
1027
1028 return true;
1029 }
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041 bool
1042 pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node,
1043 enum pcmk_ipc_server dest)
1044 {
1045 bool rc = true;
1046 GString *data = g_string_sized_new(1024);
1047
1048 pcmk__xml_string(msg, 0, data, 0);
1049
1050 rc = send_cpg_text(data->str, node, dest);
1051 g_string_free(data, TRUE);
1052 return rc;
1053 }