This source file includes following definitions.
- cluster_disconnect_cpg
- get_local_nodeid
- crm_cs_flush_cb
- crm_cs_flush
- pcmk_cpg_dispatch
- ais_dest
- msg_type2text
- check_message_sanity
- pcmk_message_common_cs
- cmp_member_list_nodeid
- cpgreason2str
- peer_name
- pcmk_cpg_membership
- cluster_connect_cpg
- pcmk__cpg_send_xml
- send_cluster_text
- text2msg_type
1
2
3
4
5
6
7
8
9
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21
22 #include <qb/qbipc_common.h>
23 #include <qb/qbipcc.h>
24 #include <qb/qbutil.h>
25
26 #include <corosync/corodefs.h>
27 #include <corosync/corotypes.h>
28 #include <corosync/hdb.h>
29 #include <corosync/cpg.h>
30
31 #include <crm/msg_xml.h>
32
33 #include <crm/common/ipc_internal.h>
34 #include "crmcluster_private.h"
35
36
37
38
39 static cpg_handle_t pcmk_cpg_handle = 0;
40
41
42 static bool cpg_evicted = false;
43 static GList *cs_message_queue = NULL;
44 static int cs_message_timer = 0;
45
46 struct pcmk__cpg_host_s {
47 uint32_t id;
48 uint32_t pid;
49 gboolean local;
50 enum crm_ais_msg_types type;
51 uint32_t size;
52 char uname[MAX_NAME];
53 } __attribute__ ((packed));
54
55 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
56
57 struct pcmk__cpg_msg_s {
58 struct qb_ipc_response_header header __attribute__ ((aligned(8)));
59 uint32_t id;
60 gboolean is_compressed;
61
62 pcmk__cpg_host_t host;
63 pcmk__cpg_host_t sender;
64
65 uint32_t size;
66 uint32_t compressed_size;
67
68 char data[0];
69
70 } __attribute__ ((packed));
71
72 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
73
74 static void crm_cs_flush(gpointer data);
75
76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
77
78 #define cs_repeat(rc, counter, max, code) do { \
79 rc = code; \
80 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
81 counter++; \
82 crm_debug("Retrying operation after %ds", counter); \
83 sleep(counter); \
84 } else { \
85 break; \
86 } \
87 } while (counter < max)
88
89
90
91
92
93
94 void
95 cluster_disconnect_cpg(crm_cluster_t *cluster)
96 {
97 pcmk_cpg_handle = 0;
98 if (cluster->cpg_handle) {
99 crm_trace("Disconnecting CPG");
100 cpg_leave(cluster->cpg_handle, &cluster->group);
101 cpg_finalize(cluster->cpg_handle);
102 cluster->cpg_handle = 0;
103
104 } else {
105 crm_info("No CPG connection");
106 }
107 }
108
109
110
111
112
113
114
115
116 uint32_t
117 get_local_nodeid(cpg_handle_t handle)
118 {
119 cs_error_t rc = CS_OK;
120 int retries = 0;
121 static uint32_t local_nodeid = 0;
122 cpg_handle_t local_handle = handle;
123 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
124 int fd = -1;
125 uid_t found_uid = 0;
126 gid_t found_gid = 0;
127 pid_t found_pid = 0;
128 int rv;
129
130 if(local_nodeid != 0) {
131 return local_nodeid;
132 }
133
134 if(handle == 0) {
135 crm_trace("Creating connection");
136 cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
137 if (rc != CS_OK) {
138 crm_err("Could not connect to the CPG API: %s (%d)",
139 cs_strerror(rc), rc);
140 return 0;
141 }
142
143 rc = cpg_fd_get(local_handle, &fd);
144 if (rc != CS_OK) {
145 crm_err("Could not obtain the CPG API connection: %s (%d)",
146 cs_strerror(rc), rc);
147 goto bail;
148 }
149
150
151 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
152 &found_uid, &found_gid))) {
153 crm_err("CPG provider is not authentic:"
154 " process %lld (uid: %lld, gid: %lld)",
155 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
156 (long long) found_uid, (long long) found_gid);
157 goto bail;
158 } else if (rv < 0) {
159 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
160 strerror(-rv), -rv);
161 goto bail;
162 }
163 }
164
165 if (rc == CS_OK) {
166 retries = 0;
167 crm_trace("Performing lookup");
168 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
169 }
170
171 if (rc != CS_OK) {
172 crm_err("Could not get local node id from the CPG API: %s (%d)",
173 pcmk__cs_err_str(rc), rc);
174 }
175
176 bail:
177 if(handle == 0) {
178 crm_trace("Closing connection");
179 cpg_finalize(local_handle);
180 }
181 crm_debug("Local nodeid is %u", local_nodeid);
182 return local_nodeid;
183 }
184
185
186
187
188
189
190
191
192
193 static gboolean
194 crm_cs_flush_cb(gpointer data)
195 {
196 cs_message_timer = 0;
197 crm_cs_flush(data);
198 return FALSE;
199 }
200
201
202 #define CS_SEND_MAX 200
203
204
205
206
207
208
209
210 static void
211 crm_cs_flush(gpointer data)
212 {
213 unsigned int sent = 0;
214 guint queue_len = 0;
215 cs_error_t rc = 0;
216 cpg_handle_t *handle = (cpg_handle_t *) data;
217
218 if (*handle == 0) {
219 crm_trace("Connection is dead");
220 return;
221 }
222
223 queue_len = g_list_length(cs_message_queue);
224 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
225 crm_err("CPG queue has grown to %d", queue_len);
226
227 } else if (queue_len == CS_SEND_MAX) {
228 crm_warn("CPG queue has grown to %d", queue_len);
229 }
230
231 if (cs_message_timer != 0) {
232
233 crm_trace("Timer active %d", cs_message_timer);
234 return;
235 }
236
237 while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
238 struct iovec *iov = cs_message_queue->data;
239
240 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
241 if (rc != CS_OK) {
242 break;
243 }
244
245 sent++;
246 crm_trace("CPG message sent, size=%llu",
247 (unsigned long long) iov->iov_len);
248
249 cs_message_queue = g_list_remove(cs_message_queue, iov);
250 free(iov->iov_base);
251 free(iov);
252 }
253
254 queue_len -= sent;
255 if ((sent > 1) || (cs_message_queue != NULL)) {
256 crm_info("Sent %u CPG messages (%d remaining): %s (%d)",
257 sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
258 } else {
259 crm_trace("Sent %u CPG messages (%d remaining): %s (%d)",
260 sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
261 }
262
263 if (cs_message_queue) {
264 uint32_t delay_ms = 100;
265 if (rc != CS_OK) {
266
267 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
268 }
269 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
270 }
271 }
272
273
274
275
276
277
278
279
280
281 static int
282 pcmk_cpg_dispatch(gpointer user_data)
283 {
284 cs_error_t rc = CS_OK;
285 crm_cluster_t *cluster = (crm_cluster_t *) user_data;
286
287 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
288 if (rc != CS_OK) {
289 crm_err("Connection to the CPG API failed: %s (%d)",
290 pcmk__cs_err_str(rc), rc);
291 cpg_finalize(cluster->cpg_handle);
292 cluster->cpg_handle = 0;
293 return -1;
294
295 } else if (cpg_evicted) {
296 crm_err("Evicted from CPG membership");
297 return -1;
298 }
299 return 0;
300 }
301
302 static inline const char *
303 ais_dest(const pcmk__cpg_host_t *host)
304 {
305 if (host->local) {
306 return "local";
307 } else if (host->size > 0) {
308 return host->uname;
309 } else {
310 return "<all>";
311 }
312 }
313
314 static inline const char *
315 msg_type2text(enum crm_ais_msg_types type)
316 {
317 const char *text = "unknown";
318
319 switch (type) {
320 case crm_msg_none:
321 text = "unknown";
322 break;
323 case crm_msg_ais:
324 text = "ais";
325 break;
326 case crm_msg_cib:
327 text = "cib";
328 break;
329 case crm_msg_crmd:
330 text = "crmd";
331 break;
332 case crm_msg_pe:
333 text = "pengine";
334 break;
335 case crm_msg_te:
336 text = "tengine";
337 break;
338 case crm_msg_lrmd:
339 text = "lrmd";
340 break;
341 case crm_msg_attrd:
342 text = "attrd";
343 break;
344 case crm_msg_stonithd:
345 text = "stonithd";
346 break;
347 case crm_msg_stonith_ng:
348 text = "stonith-ng";
349 break;
350 }
351 return text;
352 }
353
354
355
356
357
358
359
360
361
362 static bool
363 check_message_sanity(const pcmk__cpg_msg_t *msg)
364 {
365 int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
366
367 if (payload_size < 1) {
368 crm_err("%sCPG message %d from %s invalid: "
369 "Claimed size of %d bytes is too small "
370 CRM_XS " from %s[%u] to %s@%s",
371 (msg->is_compressed? "Compressed " : ""),
372 msg->id, ais_dest(&(msg->sender)),
373 (int) msg->header.size,
374 msg_type2text(msg->sender.type), msg->sender.pid,
375 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
376 return false;
377 }
378
379 if (msg->header.error != CS_OK) {
380 crm_err("%sCPG message %d from %s invalid: "
381 "Sender indicated error %d "
382 CRM_XS " from %s[%u] to %s@%s",
383 (msg->is_compressed? "Compressed " : ""),
384 msg->id, ais_dest(&(msg->sender)),
385 msg->header.error,
386 msg_type2text(msg->sender.type), msg->sender.pid,
387 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
388 return false;
389 }
390
391 if (msg_data_len(msg) != payload_size) {
392 crm_err("%sCPG message %d from %s invalid: "
393 "Total size %d inconsistent with payload size %d "
394 CRM_XS " from %s[%u] to %s@%s",
395 (msg->is_compressed? "Compressed " : ""),
396 msg->id, ais_dest(&(msg->sender)),
397 (int) msg->header.size, (int) msg_data_len(msg),
398 msg_type2text(msg->sender.type), msg->sender.pid,
399 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
400 return false;
401 }
402
403 if (!msg->is_compressed &&
404
405
406
407 (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
408 || (msg->data[msg->size - 1] != '\0'))) {
409 crm_err("CPG message %d from %s invalid: "
410 "Payload does not end at byte %llu "
411 CRM_XS " from %s[%u] to %s@%s",
412 msg->id, ais_dest(&(msg->sender)),
413 (unsigned long long) msg->size,
414 msg_type2text(msg->sender.type), msg->sender.pid,
415 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
416 return false;
417 }
418
419 crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
420 (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
421 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
422 ais_dest(&(msg->sender)),
423 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
424 return true;
425 }
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443 char *
444 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
445 uint32_t *kind, const char **from)
446 {
447 char *data = NULL;
448 pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
449
450 if(handle) {
451
452 uint32_t local_nodeid = get_local_nodeid(handle);
453 const char *local_name = get_local_node_name();
454
455 if (msg->sender.id > 0 && msg->sender.id != nodeid) {
456 crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
457 return NULL;
458
459 } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
460
461 crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
462 return NULL;
463 } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
464
465 crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
466 return NULL;
467 }
468
469 msg->sender.id = nodeid;
470 if (msg->sender.size == 0) {
471 crm_node_t *peer = crm_get_peer(nodeid, NULL);
472
473 if (peer == NULL) {
474 crm_err("Peer with nodeid=%u is unknown", nodeid);
475
476 } else if (peer->uname == NULL) {
477 crm_err("No uname for peer with nodeid=%u", nodeid);
478
479 } else {
480 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
481 msg->sender.size = strlen(peer->uname);
482 memset(msg->sender.uname, 0, MAX_NAME);
483 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
484 }
485 }
486 }
487
488 crm_trace("Got new%s message (size=%d, %d, %d)",
489 msg->is_compressed ? " compressed" : "",
490 msg_data_len(msg), msg->size, msg->compressed_size);
491
492 if (kind != NULL) {
493 *kind = msg->header.id;
494 }
495 if (from != NULL) {
496 *from = msg->sender.uname;
497 }
498
499 if (msg->is_compressed && msg->size > 0) {
500 int rc = BZ_OK;
501 char *uncompressed = NULL;
502 unsigned int new_size = msg->size + 1;
503
504 if (!check_message_sanity(msg)) {
505 goto badmsg;
506 }
507
508 crm_trace("Decompressing message data");
509 uncompressed = calloc(1, new_size);
510 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
511
512 if (rc != BZ_OK) {
513 crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
514 bz2_strerror(rc), rc);
515 free(uncompressed);
516 goto badmsg;
517 }
518
519 CRM_ASSERT(rc == BZ_OK);
520 CRM_ASSERT(new_size == msg->size);
521
522 data = uncompressed;
523
524 } else if (!check_message_sanity(msg)) {
525 goto badmsg;
526
527 } else {
528 data = strdup(msg->data);
529 }
530
531
532 crm_get_peer(msg->sender.id, msg->sender.uname);
533
534 crm_trace("Payload: %.200s", data);
535 return data;
536
537 badmsg:
538 crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
539 " min=%d, total=%d, size=%d, bz2_size=%d",
540 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
541 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
542 msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
543 msg->header.size, msg->size, msg->compressed_size);
544
545 free(data);
546 return NULL;
547 }
548
549
550
551
552
553
554
555
556
557
558
559
560 static int
561 cmp_member_list_nodeid(const void *first, const void *second)
562 {
563 const struct cpg_address *const a = *((const struct cpg_address **) first),
564 *const b = *((const struct cpg_address **) second);
565 if (a->nodeid < b->nodeid) {
566 return -1;
567 } else if (a->nodeid > b->nodeid) {
568 return 1;
569 }
570
571 return 0;
572 }
573
574
575
576
577
578
579
580
581
582 static const char *
583 cpgreason2str(cpg_reason_t reason)
584 {
585 switch (reason) {
586 case CPG_REASON_JOIN: return " via cpg_join";
587 case CPG_REASON_LEAVE: return " via cpg_leave";
588 case CPG_REASON_NODEDOWN: return " via cluster exit";
589 case CPG_REASON_NODEUP: return " via cluster join";
590 case CPG_REASON_PROCDOWN: return " for unknown reason";
591 default: break;
592 }
593 return "";
594 }
595
596
597
598
599
600
601
602
603
604 static inline const char *
605 peer_name(crm_node_t *peer)
606 {
607 if (peer == NULL) {
608 return "unknown node";
609 } else if (peer->uname == NULL) {
610 return "peer node";
611 } else {
612 return peer->uname;
613 }
614 }
615
616
617
618
619
620
621
622
623
624
625
626
627
628 void
629 pcmk_cpg_membership(cpg_handle_t handle,
630 const struct cpg_name *groupName,
631 const struct cpg_address *member_list, size_t member_list_entries,
632 const struct cpg_address *left_list, size_t left_list_entries,
633 const struct cpg_address *joined_list, size_t joined_list_entries)
634 {
635 int i;
636 gboolean found = FALSE;
637 static int counter = 0;
638 uint32_t local_nodeid = get_local_nodeid(handle);
639 const struct cpg_address *key, **sorted;
640
641 sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
642 CRM_ASSERT(sorted != NULL);
643
644 for (size_t iter = 0; iter < member_list_entries; iter++) {
645 sorted[iter] = member_list + iter;
646 }
647
648 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
649 cmp_member_list_nodeid);
650
651 for (i = 0; i < left_list_entries; i++) {
652 crm_node_t *peer = pcmk__search_cluster_node_cache(left_list[i].nodeid,
653 NULL);
654 const struct cpg_address **rival = NULL;
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671 if (peer) {
672 key = &left_list[i];
673 rival = bsearch(&key, sorted, member_list_entries,
674 sizeof(const struct cpg_address *),
675 cmp_member_list_nodeid);
676 }
677
678 if (rival == NULL) {
679 crm_info("Group %s event %d: %s (node %u pid %u) left%s",
680 groupName->value, counter, peer_name(peer),
681 left_list[i].nodeid, left_list[i].pid,
682 cpgreason2str(left_list[i].reason));
683 if (peer) {
684 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
685 OFFLINESTATUS);
686 }
687 } else if (left_list[i].nodeid == local_nodeid) {
688 crm_warn("Group %s event %d: duplicate local pid %u left%s",
689 groupName->value, counter,
690 left_list[i].pid, cpgreason2str(left_list[i].reason));
691 } else {
692 crm_warn("Group %s event %d: "
693 "%s (node %u) duplicate pid %u left%s (%u remains)",
694 groupName->value, counter, peer_name(peer),
695 left_list[i].nodeid, left_list[i].pid,
696 cpgreason2str(left_list[i].reason), (*rival)->pid);
697 }
698 }
699 free(sorted);
700 sorted = NULL;
701
702 for (i = 0; i < joined_list_entries; i++) {
703 crm_info("Group %s event %d: node %u pid %u joined%s",
704 groupName->value, counter, joined_list[i].nodeid,
705 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
706 }
707
708 for (i = 0; i < member_list_entries; i++) {
709 crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
710
711 if (member_list[i].nodeid == local_nodeid
712 && member_list[i].pid != getpid()) {
713
714 crm_warn("Group %s event %d: detected duplicate local pid %u",
715 groupName->value, counter, member_list[i].pid);
716 continue;
717 }
718 crm_info("Group %s event %d: %s (node %u pid %u) is member",
719 groupName->value, counter, peer_name(peer),
720 member_list[i].nodeid, member_list[i].pid);
721
722
723
724
725 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
726 ONLINESTATUS);
727
728 if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
729
730
731
732
733
734
735
736 time_t now = time(NULL);
737
738 if (peer->when_lost == 0) {
739
740 peer->when_lost = now;
741
742 } else if (now > (peer->when_lost + 60)) {
743
744 crm_warn("Node %u is member of group %s but was believed offline",
745 member_list[i].nodeid, groupName->value);
746 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
747 }
748 }
749
750 if (local_nodeid == member_list[i].nodeid) {
751 found = TRUE;
752 }
753 }
754
755 if (!found) {
756 crm_err("Local node was evicted from group %s", groupName->value);
757 cpg_evicted = true;
758 }
759
760 counter++;
761 }
762
763
764
765
766
767
768
769
770 gboolean
771 cluster_connect_cpg(crm_cluster_t *cluster)
772 {
773 cs_error_t rc;
774 int fd = -1;
775 int retries = 0;
776 uint32_t id = 0;
777 crm_node_t *peer = NULL;
778 cpg_handle_t handle = 0;
779 const char *message_name = pcmk__message_name(crm_system_name);
780 uid_t found_uid = 0;
781 gid_t found_gid = 0;
782 pid_t found_pid = 0;
783 int rv;
784
785 struct mainloop_fd_callbacks cpg_fd_callbacks = {
786 .dispatch = pcmk_cpg_dispatch,
787 .destroy = cluster->destroy,
788 };
789
790 cpg_model_v1_data_t cpg_model_info = {
791 .model = CPG_MODEL_V1,
792 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
793 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
794 .cpg_totem_confchg_fn = NULL,
795 .flags = 0,
796 };
797
798 cpg_evicted = false;
799 cluster->group.length = 0;
800 cluster->group.value[0] = 0;
801
802
803 strncpy(cluster->group.value, message_name, 127);
804 cluster->group.value[127] = 0;
805 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
806
807 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
808 if (rc != CS_OK) {
809 crm_err("Could not connect to the CPG API: %s (%d)",
810 cs_strerror(rc), rc);
811 goto bail;
812 }
813
814 rc = cpg_fd_get(handle, &fd);
815 if (rc != CS_OK) {
816 crm_err("Could not obtain the CPG API connection: %s (%d)",
817 cs_strerror(rc), rc);
818 goto bail;
819 }
820
821
822 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
823 &found_uid, &found_gid))) {
824 crm_err("CPG provider is not authentic:"
825 " process %lld (uid: %lld, gid: %lld)",
826 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
827 (long long) found_uid, (long long) found_gid);
828 rc = CS_ERR_ACCESS;
829 goto bail;
830 } else if (rv < 0) {
831 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
832 strerror(-rv), -rv);
833 rc = CS_ERR_ACCESS;
834 goto bail;
835 }
836
837 id = get_local_nodeid(handle);
838 if (id == 0) {
839 crm_err("Could not get local node id from the CPG API");
840 goto bail;
841
842 }
843 cluster->nodeid = id;
844
845 retries = 0;
846 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
847 if (rc != CS_OK) {
848 crm_err("Could not join the CPG group '%s': %d", message_name, rc);
849 goto bail;
850 }
851
852 pcmk_cpg_handle = handle;
853 cluster->cpg_handle = handle;
854 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
855
856 bail:
857 if (rc != CS_OK) {
858 cpg_finalize(handle);
859 return FALSE;
860 }
861
862 peer = crm_get_peer(id, NULL);
863 crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
864 return TRUE;
865 }
866
867
868
869
870
871
872
873
874
875
876
877 gboolean
878 pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
879 {
880 gboolean rc = TRUE;
881 char *data = NULL;
882
883 data = dump_xml_unformatted(msg);
884 rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
885 free(data);
886 return rc;
887 }
888
889
890
891
892
893
894
895
896
897
898
899
900
901 gboolean
902 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
903 gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
904 {
905 static int msg_id = 0;
906 static int local_pid = 0;
907 static int local_name_len = 0;
908 static const char *local_name = NULL;
909
910 char *target = NULL;
911 struct iovec *iov;
912 pcmk__cpg_msg_t *msg = NULL;
913 enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
914
915 switch (msg_class) {
916 case crm_class_cluster:
917 break;
918 default:
919 crm_err("Invalid message class: %d", msg_class);
920 return FALSE;
921 }
922
923 CRM_CHECK(dest != crm_msg_ais, return FALSE);
924
925 if (local_name == NULL) {
926 local_name = get_local_node_name();
927 }
928 if ((local_name_len == 0) && (local_name != NULL)) {
929 local_name_len = strlen(local_name);
930 }
931
932 if (data == NULL) {
933 data = "";
934 }
935
936 if (local_pid == 0) {
937 local_pid = getpid();
938 }
939
940 if (sender == crm_msg_none) {
941 sender = local_pid;
942 }
943
944 msg = calloc(1, sizeof(pcmk__cpg_msg_t));
945
946 msg_id++;
947 msg->id = msg_id;
948 msg->header.id = msg_class;
949 msg->header.error = CS_OK;
950
951 msg->host.type = dest;
952 msg->host.local = local;
953
954 if (node) {
955 if (node->uname) {
956 target = strdup(node->uname);
957 msg->host.size = strlen(node->uname);
958 memset(msg->host.uname, 0, MAX_NAME);
959 memcpy(msg->host.uname, node->uname, msg->host.size);
960 } else {
961 target = crm_strdup_printf("%u", node->id);
962 }
963 msg->host.id = node->id;
964 } else {
965 target = strdup("all");
966 }
967
968 msg->sender.id = 0;
969 msg->sender.type = sender;
970 msg->sender.pid = local_pid;
971 msg->sender.size = local_name_len;
972 memset(msg->sender.uname, 0, MAX_NAME);
973 if ((local_name != NULL) && (msg->sender.size != 0)) {
974 memcpy(msg->sender.uname, local_name, msg->sender.size);
975 }
976
977 msg->size = 1 + strlen(data);
978 msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
979
980 if (msg->size < CRM_BZ2_THRESHOLD) {
981 msg = pcmk__realloc(msg, msg->header.size);
982 memcpy(msg->data, data, msg->size);
983
984 } else {
985 char *compressed = NULL;
986 unsigned int new_size = 0;
987 char *uncompressed = strdup(data);
988
989 if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
990 &compressed, &new_size) == pcmk_rc_ok) {
991
992 msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
993 msg = pcmk__realloc(msg, msg->header.size);
994 memcpy(msg->data, compressed, new_size);
995
996 msg->is_compressed = TRUE;
997 msg->compressed_size = new_size;
998
999 } else {
1000
1001
1002 msg = pcmk__realloc(msg, msg->header.size);
1003 memcpy(msg->data, data, msg->size);
1004 }
1005
1006 free(uncompressed);
1007 free(compressed);
1008 }
1009
1010 iov = calloc(1, sizeof(struct iovec));
1011 iov->iov_base = msg;
1012 iov->iov_len = msg->header.size;
1013
1014 if (msg->compressed_size) {
1015 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1016 msg->id, target, (unsigned long long) iov->iov_len,
1017 msg->compressed_size, data);
1018 } else {
1019 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1020 msg->id, target, (unsigned long long) iov->iov_len,
1021 msg->size, data);
1022 }
1023 free(target);
1024
1025 cs_message_queue = g_list_append(cs_message_queue, iov);
1026 crm_cs_flush(&pcmk_cpg_handle);
1027
1028 return TRUE;
1029 }
1030
1031
1032
1033
1034
1035
1036
1037
1038 enum crm_ais_msg_types
1039 text2msg_type(const char *text)
1040 {
1041 int type = crm_msg_none;
1042
1043 CRM_CHECK(text != NULL, return type);
1044 text = pcmk__message_name(text);
1045 if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1046 type = crm_msg_ais;
1047 } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1048 type = crm_msg_cib;
1049 } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1050 type = crm_msg_crmd;
1051 } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1052 type = crm_msg_te;
1053 } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1054 type = crm_msg_pe;
1055 } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1056 type = crm_msg_lrmd;
1057 } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1058 type = crm_msg_stonithd;
1059 } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1060 type = crm_msg_stonith_ng;
1061 } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1062 type = crm_msg_attrd;
1063
1064 } else {
1065
1066
1067
1068 int scan_rc = sscanf(text, "%d", &type);
1069
1070 if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1071
1072 type = crm_msg_none;
1073 }
1074 }
1075 return type;
1076 }