This source file includes following definitions.
- cluster_disconnect_cpg
- get_local_nodeid
- crm_cs_flush_cb
- crm_cs_flush
- pcmk_cpg_dispatch
- ais_dest
- msg_type2text
- check_message_sanity
- pcmk_message_common_cs
- cmp_member_list_nodeid
- cpgreason2str
- peer_name
- node_left
- pcmk_cpg_membership
- cluster_connect_cpg
- pcmk__cpg_send_xml
- send_cluster_text
- text2msg_type
1
2
3
4
5
6
7
8
9
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21
22 #include <qb/qbipc_common.h>
23 #include <qb/qbipcc.h>
24 #include <qb/qbutil.h>
25
26 #include <corosync/corodefs.h>
27 #include <corosync/corotypes.h>
28 #include <corosync/hdb.h>
29 #include <corosync/cpg.h>
30
31 #include <crm/msg_xml.h>
32
33 #include <crm/common/ipc_internal.h>
34 #include "crmcluster_private.h"
35
36
37
38
39 static cpg_handle_t pcmk_cpg_handle = 0;
40
41
42 static bool cpg_evicted = false;
43 static GList *cs_message_queue = NULL;
44 static int cs_message_timer = 0;
45
46 struct pcmk__cpg_host_s {
47 uint32_t id;
48 uint32_t pid;
49 gboolean local;
50 enum crm_ais_msg_types type;
51 uint32_t size;
52 char uname[MAX_NAME];
53 } __attribute__ ((packed));
54
55 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
56
57 struct pcmk__cpg_msg_s {
58 struct qb_ipc_response_header header __attribute__ ((aligned(8)));
59 uint32_t id;
60 gboolean is_compressed;
61
62 pcmk__cpg_host_t host;
63 pcmk__cpg_host_t sender;
64
65 uint32_t size;
66 uint32_t compressed_size;
67
68 char data[0];
69
70 } __attribute__ ((packed));
71
72 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
73
74 static void crm_cs_flush(gpointer data);
75
76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
77
78 #define cs_repeat(rc, counter, max, code) do { \
79 rc = code; \
80 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
81 counter++; \
82 crm_debug("Retrying operation after %ds", counter); \
83 sleep(counter); \
84 } else { \
85 break; \
86 } \
87 } while (counter < max)
88
89
90
91
92
93
94 void
95 cluster_disconnect_cpg(crm_cluster_t *cluster)
96 {
97 pcmk_cpg_handle = 0;
98 if (cluster->cpg_handle) {
99 crm_trace("Disconnecting CPG");
100 cpg_leave(cluster->cpg_handle, &cluster->group);
101 cpg_finalize(cluster->cpg_handle);
102 cluster->cpg_handle = 0;
103
104 } else {
105 crm_info("No CPG connection");
106 }
107 }
108
109
110
111
112
113
114
115
116 uint32_t
117 get_local_nodeid(cpg_handle_t handle)
118 {
119 cs_error_t rc = CS_OK;
120 int retries = 0;
121 static uint32_t local_nodeid = 0;
122 cpg_handle_t local_handle = handle;
123 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
124 int fd = -1;
125 uid_t found_uid = 0;
126 gid_t found_gid = 0;
127 pid_t found_pid = 0;
128 int rv;
129
130 if(local_nodeid != 0) {
131 return local_nodeid;
132 }
133
134 if(handle == 0) {
135 crm_trace("Creating connection");
136 cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
137 if (rc != CS_OK) {
138 crm_err("Could not connect to the CPG API: %s (%d)",
139 cs_strerror(rc), rc);
140 return 0;
141 }
142
143 rc = cpg_fd_get(local_handle, &fd);
144 if (rc != CS_OK) {
145 crm_err("Could not obtain the CPG API connection: %s (%d)",
146 cs_strerror(rc), rc);
147 goto bail;
148 }
149
150
151 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
152 &found_uid, &found_gid))) {
153 crm_err("CPG provider is not authentic:"
154 " process %lld (uid: %lld, gid: %lld)",
155 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
156 (long long) found_uid, (long long) found_gid);
157 goto bail;
158 } else if (rv < 0) {
159 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
160 strerror(-rv), -rv);
161 goto bail;
162 }
163 }
164
165 if (rc == CS_OK) {
166 retries = 0;
167 crm_trace("Performing lookup");
168 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
169 }
170
171 if (rc != CS_OK) {
172 crm_err("Could not get local node id from the CPG API: %s (%d)",
173 pcmk__cs_err_str(rc), rc);
174 }
175
176 bail:
177 if(handle == 0) {
178 crm_trace("Closing connection");
179 cpg_finalize(local_handle);
180 }
181 crm_debug("Local nodeid is %u", local_nodeid);
182 return local_nodeid;
183 }
184
185
186
187
188
189
190
191
192
193 static gboolean
194 crm_cs_flush_cb(gpointer data)
195 {
196 cs_message_timer = 0;
197 crm_cs_flush(data);
198 return FALSE;
199 }
200
201
202 #define CS_SEND_MAX 200
203
204
205
206
207
208
209
210 static void
211 crm_cs_flush(gpointer data)
212 {
213 unsigned int sent = 0;
214 guint queue_len = 0;
215 cs_error_t rc = 0;
216 cpg_handle_t *handle = (cpg_handle_t *) data;
217
218 if (*handle == 0) {
219 crm_trace("Connection is dead");
220 return;
221 }
222
223 queue_len = g_list_length(cs_message_queue);
224 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
225 crm_err("CPG queue has grown to %d", queue_len);
226
227 } else if (queue_len == CS_SEND_MAX) {
228 crm_warn("CPG queue has grown to %d", queue_len);
229 }
230
231 if (cs_message_timer != 0) {
232
233 crm_trace("Timer active %d", cs_message_timer);
234 return;
235 }
236
237 while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
238 struct iovec *iov = cs_message_queue->data;
239
240 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
241 if (rc != CS_OK) {
242 break;
243 }
244
245 sent++;
246 crm_trace("CPG message sent, size=%llu",
247 (unsigned long long) iov->iov_len);
248
249 cs_message_queue = g_list_remove(cs_message_queue, iov);
250 free(iov->iov_base);
251 free(iov);
252 }
253
254 queue_len -= sent;
255 do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
256 "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
257 sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
258 (int) rc);
259
260 if (cs_message_queue) {
261 uint32_t delay_ms = 100;
262 if (rc != CS_OK) {
263
264 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
265 }
266 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
267 }
268 }
269
270
271
272
273
274
275
276
277
278 static int
279 pcmk_cpg_dispatch(gpointer user_data)
280 {
281 cs_error_t rc = CS_OK;
282 crm_cluster_t *cluster = (crm_cluster_t *) user_data;
283
284 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
285 if (rc != CS_OK) {
286 crm_err("Connection to the CPG API failed: %s (%d)",
287 pcmk__cs_err_str(rc), rc);
288 cpg_finalize(cluster->cpg_handle);
289 cluster->cpg_handle = 0;
290 return -1;
291
292 } else if (cpg_evicted) {
293 crm_err("Evicted from CPG membership");
294 return -1;
295 }
296 return 0;
297 }
298
299 static inline const char *
300 ais_dest(const pcmk__cpg_host_t *host)
301 {
302 if (host->local) {
303 return "local";
304 } else if (host->size > 0) {
305 return host->uname;
306 } else {
307 return "<all>";
308 }
309 }
310
311 static inline const char *
312 msg_type2text(enum crm_ais_msg_types type)
313 {
314 const char *text = "unknown";
315
316 switch (type) {
317 case crm_msg_none:
318 text = "unknown";
319 break;
320 case crm_msg_ais:
321 text = "ais";
322 break;
323 case crm_msg_cib:
324 text = "cib";
325 break;
326 case crm_msg_crmd:
327 text = "crmd";
328 break;
329 case crm_msg_pe:
330 text = "pengine";
331 break;
332 case crm_msg_te:
333 text = "tengine";
334 break;
335 case crm_msg_lrmd:
336 text = "lrmd";
337 break;
338 case crm_msg_attrd:
339 text = "attrd";
340 break;
341 case crm_msg_stonithd:
342 text = "stonithd";
343 break;
344 case crm_msg_stonith_ng:
345 text = "stonith-ng";
346 break;
347 }
348 return text;
349 }
350
351
352
353
354
355
356
357
358
359 static bool
360 check_message_sanity(const pcmk__cpg_msg_t *msg)
361 {
362 int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
363
364 if (payload_size < 1) {
365 crm_err("%sCPG message %d from %s invalid: "
366 "Claimed size of %d bytes is too small "
367 CRM_XS " from %s[%u] to %s@%s",
368 (msg->is_compressed? "Compressed " : ""),
369 msg->id, ais_dest(&(msg->sender)),
370 (int) msg->header.size,
371 msg_type2text(msg->sender.type), msg->sender.pid,
372 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
373 return false;
374 }
375
376 if (msg->header.error != CS_OK) {
377 crm_err("%sCPG message %d from %s invalid: "
378 "Sender indicated error %d "
379 CRM_XS " from %s[%u] to %s@%s",
380 (msg->is_compressed? "Compressed " : ""),
381 msg->id, ais_dest(&(msg->sender)),
382 msg->header.error,
383 msg_type2text(msg->sender.type), msg->sender.pid,
384 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
385 return false;
386 }
387
388 if (msg_data_len(msg) != payload_size) {
389 crm_err("%sCPG message %d from %s invalid: "
390 "Total size %d inconsistent with payload size %d "
391 CRM_XS " from %s[%u] to %s@%s",
392 (msg->is_compressed? "Compressed " : ""),
393 msg->id, ais_dest(&(msg->sender)),
394 (int) msg->header.size, (int) msg_data_len(msg),
395 msg_type2text(msg->sender.type), msg->sender.pid,
396 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
397 return false;
398 }
399
400 if (!msg->is_compressed &&
401
402
403
404 (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
405 || (msg->data[msg->size - 1] != '\0'))) {
406 crm_err("CPG message %d from %s invalid: "
407 "Payload does not end at byte %llu "
408 CRM_XS " from %s[%u] to %s@%s",
409 msg->id, ais_dest(&(msg->sender)),
410 (unsigned long long) msg->size,
411 msg_type2text(msg->sender.type), msg->sender.pid,
412 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
413 return false;
414 }
415
416 crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
417 (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
418 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
419 ais_dest(&(msg->sender)),
420 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
421 return true;
422 }
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440 char *
441 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
442 uint32_t *kind, const char **from)
443 {
444 char *data = NULL;
445 pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
446
447 if(handle) {
448
449 uint32_t local_nodeid = get_local_nodeid(handle);
450 const char *local_name = get_local_node_name();
451
452 if (msg->sender.id > 0 && msg->sender.id != nodeid) {
453 crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
454 return NULL;
455
456 } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
457
458 crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
459 return NULL;
460 } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
461
462 crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
463 return NULL;
464 }
465
466 msg->sender.id = nodeid;
467 if (msg->sender.size == 0) {
468 crm_node_t *peer = crm_get_peer(nodeid, NULL);
469
470 if (peer == NULL) {
471 crm_err("Peer with nodeid=%u is unknown", nodeid);
472
473 } else if (peer->uname == NULL) {
474 crm_err("No uname for peer with nodeid=%u", nodeid);
475
476 } else {
477 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
478 msg->sender.size = strlen(peer->uname);
479 memset(msg->sender.uname, 0, MAX_NAME);
480 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
481 }
482 }
483 }
484
485 crm_trace("Got new%s message (size=%d, %d, %d)",
486 msg->is_compressed ? " compressed" : "",
487 msg_data_len(msg), msg->size, msg->compressed_size);
488
489 if (kind != NULL) {
490 *kind = msg->header.id;
491 }
492 if (from != NULL) {
493 *from = msg->sender.uname;
494 }
495
496 if (msg->is_compressed && msg->size > 0) {
497 int rc = BZ_OK;
498 char *uncompressed = NULL;
499 unsigned int new_size = msg->size + 1;
500
501 if (!check_message_sanity(msg)) {
502 goto badmsg;
503 }
504
505 crm_trace("Decompressing message data");
506 uncompressed = calloc(1, new_size);
507 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
508
509 rc = pcmk__bzlib2rc(rc);
510
511 if (rc != pcmk_rc_ok) {
512 crm_err("Decompression failed: %s " CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
513 free(uncompressed);
514 goto badmsg;
515 }
516
517 CRM_ASSERT(new_size == msg->size);
518
519 data = uncompressed;
520
521 } else if (!check_message_sanity(msg)) {
522 goto badmsg;
523
524 } else {
525 data = strdup(msg->data);
526 }
527
528
529 crm_get_peer(msg->sender.id, msg->sender.uname);
530
531 crm_trace("Payload: %.200s", data);
532 return data;
533
534 badmsg:
535 crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
536 " min=%d, total=%d, size=%d, bz2_size=%d",
537 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
538 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
539 msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
540 msg->header.size, msg->size, msg->compressed_size);
541
542 free(data);
543 return NULL;
544 }
545
546
547
548
549
550
551
552
553
554
555
556
557 static int
558 cmp_member_list_nodeid(const void *first, const void *second)
559 {
560 const struct cpg_address *const a = *((const struct cpg_address **) first),
561 *const b = *((const struct cpg_address **) second);
562 if (a->nodeid < b->nodeid) {
563 return -1;
564 } else if (a->nodeid > b->nodeid) {
565 return 1;
566 }
567
568 return 0;
569 }
570
571
572
573
574
575
576
577
578
579 static const char *
580 cpgreason2str(cpg_reason_t reason)
581 {
582 switch (reason) {
583 case CPG_REASON_JOIN: return " via cpg_join";
584 case CPG_REASON_LEAVE: return " via cpg_leave";
585 case CPG_REASON_NODEDOWN: return " via cluster exit";
586 case CPG_REASON_NODEUP: return " via cluster join";
587 case CPG_REASON_PROCDOWN: return " for unknown reason";
588 default: break;
589 }
590 return "";
591 }
592
593
594
595
596
597
598
599
600
601 static inline const char *
602 peer_name(const crm_node_t *peer)
603 {
604 if (peer == NULL) {
605 return "unknown node";
606 } else if (peer->uname == NULL) {
607 return "peer node";
608 } else {
609 return peer->uname;
610 }
611 }
612
613
614
615
616
617
618
619
620
621
622
623
624 static void
625 node_left(const char *cpg_group_name, int event_counter,
626 uint32_t local_nodeid, const struct cpg_address *cpg_peer,
627 const struct cpg_address **sorted_member_list,
628 size_t member_list_entries)
629 {
630 crm_node_t *peer = pcmk__search_cluster_node_cache(cpg_peer->nodeid,
631 NULL, NULL);
632 const struct cpg_address **rival = NULL;
633
634
635
636
637
638
639
640
641
642
643
644
645
646 if (peer != NULL) {
647 rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
648 sizeof(const struct cpg_address *),
649 cmp_member_list_nodeid);
650 }
651
652 if (rival == NULL) {
653 crm_info("Group %s event %d: %s (node %u pid %u) left%s",
654 cpg_group_name, event_counter, peer_name(peer),
655 cpg_peer->nodeid, cpg_peer->pid,
656 cpgreason2str(cpg_peer->reason));
657 if (peer != NULL) {
658 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
659 OFFLINESTATUS);
660 }
661 } else if (cpg_peer->nodeid == local_nodeid) {
662 crm_warn("Group %s event %d: duplicate local pid %u left%s",
663 cpg_group_name, event_counter,
664 cpg_peer->pid, cpgreason2str(cpg_peer->reason));
665 } else {
666 crm_warn("Group %s event %d: "
667 "%s (node %u) duplicate pid %u left%s (%u remains)",
668 cpg_group_name, event_counter, peer_name(peer),
669 cpg_peer->nodeid, cpg_peer->pid,
670 cpgreason2str(cpg_peer->reason), (*rival)->pid);
671 }
672 }
673
674
675
676
677
678
679
680
681
682
683
684
685
686 void
687 pcmk_cpg_membership(cpg_handle_t handle,
688 const struct cpg_name *groupName,
689 const struct cpg_address *member_list, size_t member_list_entries,
690 const struct cpg_address *left_list, size_t left_list_entries,
691 const struct cpg_address *joined_list, size_t joined_list_entries)
692 {
693 int i;
694 gboolean found = FALSE;
695 static int counter = 0;
696 uint32_t local_nodeid = get_local_nodeid(handle);
697 const struct cpg_address **sorted;
698
699 sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
700 CRM_ASSERT(sorted != NULL);
701
702 for (size_t iter = 0; iter < member_list_entries; iter++) {
703 sorted[iter] = member_list + iter;
704 }
705
706 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
707 cmp_member_list_nodeid);
708
709 for (i = 0; i < left_list_entries; i++) {
710 node_left(groupName->value, counter, local_nodeid, &left_list[i],
711 sorted, member_list_entries);
712 }
713 free(sorted);
714 sorted = NULL;
715
716 for (i = 0; i < joined_list_entries; i++) {
717 crm_info("Group %s event %d: node %u pid %u joined%s",
718 groupName->value, counter, joined_list[i].nodeid,
719 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
720 }
721
722 for (i = 0; i < member_list_entries; i++) {
723 crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
724
725 if (member_list[i].nodeid == local_nodeid
726 && member_list[i].pid != getpid()) {
727
728 crm_warn("Group %s event %d: detected duplicate local pid %u",
729 groupName->value, counter, member_list[i].pid);
730 continue;
731 }
732 crm_info("Group %s event %d: %s (node %u pid %u) is member",
733 groupName->value, counter, peer_name(peer),
734 member_list[i].nodeid, member_list[i].pid);
735
736
737
738
739 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
740 ONLINESTATUS);
741
742 if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
743
744
745
746
747
748
749
750 time_t now = time(NULL);
751
752 if (peer->when_lost == 0) {
753
754 peer->when_lost = now;
755
756 } else if (now > (peer->when_lost + 60)) {
757
758 crm_warn("Node %u is member of group %s but was believed offline",
759 member_list[i].nodeid, groupName->value);
760 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
761 }
762 }
763
764 if (local_nodeid == member_list[i].nodeid) {
765 found = TRUE;
766 }
767 }
768
769 if (!found) {
770 crm_err("Local node was evicted from group %s", groupName->value);
771 cpg_evicted = true;
772 }
773
774 counter++;
775 }
776
777
778
779
780
781
782
783
784 gboolean
785 cluster_connect_cpg(crm_cluster_t *cluster)
786 {
787 cs_error_t rc;
788 int fd = -1;
789 int retries = 0;
790 uint32_t id = 0;
791 crm_node_t *peer = NULL;
792 cpg_handle_t handle = 0;
793 const char *message_name = pcmk__message_name(crm_system_name);
794 uid_t found_uid = 0;
795 gid_t found_gid = 0;
796 pid_t found_pid = 0;
797 int rv;
798
799 struct mainloop_fd_callbacks cpg_fd_callbacks = {
800 .dispatch = pcmk_cpg_dispatch,
801 .destroy = cluster->destroy,
802 };
803
804 cpg_model_v1_data_t cpg_model_info = {
805 .model = CPG_MODEL_V1,
806 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
807 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
808 .cpg_totem_confchg_fn = NULL,
809 .flags = 0,
810 };
811
812 cpg_evicted = false;
813 cluster->group.length = 0;
814 cluster->group.value[0] = 0;
815
816
817 strncpy(cluster->group.value, message_name, 127);
818 cluster->group.value[127] = 0;
819 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
820
821 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
822 if (rc != CS_OK) {
823 crm_err("Could not connect to the CPG API: %s (%d)",
824 cs_strerror(rc), rc);
825 goto bail;
826 }
827
828 rc = cpg_fd_get(handle, &fd);
829 if (rc != CS_OK) {
830 crm_err("Could not obtain the CPG API connection: %s (%d)",
831 cs_strerror(rc), rc);
832 goto bail;
833 }
834
835
836 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
837 &found_uid, &found_gid))) {
838 crm_err("CPG provider is not authentic:"
839 " process %lld (uid: %lld, gid: %lld)",
840 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
841 (long long) found_uid, (long long) found_gid);
842 rc = CS_ERR_ACCESS;
843 goto bail;
844 } else if (rv < 0) {
845 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
846 strerror(-rv), -rv);
847 rc = CS_ERR_ACCESS;
848 goto bail;
849 }
850
851 id = get_local_nodeid(handle);
852 if (id == 0) {
853 crm_err("Could not get local node id from the CPG API");
854 goto bail;
855
856 }
857 cluster->nodeid = id;
858
859 retries = 0;
860 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
861 if (rc != CS_OK) {
862 crm_err("Could not join the CPG group '%s': %d", message_name, rc);
863 goto bail;
864 }
865
866 pcmk_cpg_handle = handle;
867 cluster->cpg_handle = handle;
868 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
869
870 bail:
871 if (rc != CS_OK) {
872 cpg_finalize(handle);
873 return FALSE;
874 }
875
876 peer = crm_get_peer(id, NULL);
877 crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
878 return TRUE;
879 }
880
881
882
883
884
885
886
887
888
889
890
891 bool
892 pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node,
893 enum crm_ais_msg_types dest)
894 {
895 bool rc = true;
896 char *data = NULL;
897
898 data = dump_xml_unformatted(msg);
899 rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
900 free(data);
901 return rc;
902 }
903
904
905
906
907
908
909
910
911
912
913
914
915
916 gboolean
917 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
918 gboolean local, const crm_node_t *node,
919 enum crm_ais_msg_types dest)
920 {
921 static int msg_id = 0;
922 static int local_pid = 0;
923 static int local_name_len = 0;
924 static const char *local_name = NULL;
925
926 char *target = NULL;
927 struct iovec *iov;
928 pcmk__cpg_msg_t *msg = NULL;
929 enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
930
931 switch (msg_class) {
932 case crm_class_cluster:
933 break;
934 default:
935 crm_err("Invalid message class: %d", msg_class);
936 return FALSE;
937 }
938
939 CRM_CHECK(dest != crm_msg_ais, return FALSE);
940
941 if (local_name == NULL) {
942 local_name = get_local_node_name();
943 }
944 if ((local_name_len == 0) && (local_name != NULL)) {
945 local_name_len = strlen(local_name);
946 }
947
948 if (data == NULL) {
949 data = "";
950 }
951
952 if (local_pid == 0) {
953 local_pid = getpid();
954 }
955
956 if (sender == crm_msg_none) {
957 sender = local_pid;
958 }
959
960 msg = calloc(1, sizeof(pcmk__cpg_msg_t));
961
962 msg_id++;
963 msg->id = msg_id;
964 msg->header.id = msg_class;
965 msg->header.error = CS_OK;
966
967 msg->host.type = dest;
968 msg->host.local = local;
969
970 if (node) {
971 if (node->uname) {
972 target = strdup(node->uname);
973 msg->host.size = strlen(node->uname);
974 memset(msg->host.uname, 0, MAX_NAME);
975 memcpy(msg->host.uname, node->uname, msg->host.size);
976 } else {
977 target = crm_strdup_printf("%u", node->id);
978 }
979 msg->host.id = node->id;
980 } else {
981 target = strdup("all");
982 }
983
984 msg->sender.id = 0;
985 msg->sender.type = sender;
986 msg->sender.pid = local_pid;
987 msg->sender.size = local_name_len;
988 memset(msg->sender.uname, 0, MAX_NAME);
989 if ((local_name != NULL) && (msg->sender.size != 0)) {
990 memcpy(msg->sender.uname, local_name, msg->sender.size);
991 }
992
993 msg->size = 1 + strlen(data);
994 msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
995
996 if (msg->size < CRM_BZ2_THRESHOLD) {
997 msg = pcmk__realloc(msg, msg->header.size);
998 memcpy(msg->data, data, msg->size);
999
1000 } else {
1001 char *compressed = NULL;
1002 unsigned int new_size = 0;
1003 char *uncompressed = strdup(data);
1004
1005 if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
1006 &compressed, &new_size) == pcmk_rc_ok) {
1007
1008 msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
1009 msg = pcmk__realloc(msg, msg->header.size);
1010 memcpy(msg->data, compressed, new_size);
1011
1012 msg->is_compressed = TRUE;
1013 msg->compressed_size = new_size;
1014
1015 } else {
1016
1017
1018 msg = pcmk__realloc(msg, msg->header.size);
1019 memcpy(msg->data, data, msg->size);
1020 }
1021
1022 free(uncompressed);
1023 free(compressed);
1024 }
1025
1026 iov = calloc(1, sizeof(struct iovec));
1027 iov->iov_base = msg;
1028 iov->iov_len = msg->header.size;
1029
1030 if (msg->compressed_size) {
1031 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1032 msg->id, target, (unsigned long long) iov->iov_len,
1033 msg->compressed_size, data);
1034 } else {
1035 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1036 msg->id, target, (unsigned long long) iov->iov_len,
1037 msg->size, data);
1038 }
1039 free(target);
1040
1041 cs_message_queue = g_list_append(cs_message_queue, iov);
1042 crm_cs_flush(&pcmk_cpg_handle);
1043
1044 return TRUE;
1045 }
1046
1047
1048
1049
1050
1051
1052
1053
1054 enum crm_ais_msg_types
1055 text2msg_type(const char *text)
1056 {
1057 int type = crm_msg_none;
1058
1059 CRM_CHECK(text != NULL, return type);
1060 text = pcmk__message_name(text);
1061 if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1062 type = crm_msg_ais;
1063 } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1064 type = crm_msg_cib;
1065 } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1066 type = crm_msg_crmd;
1067 } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1068 type = crm_msg_te;
1069 } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1070 type = crm_msg_pe;
1071 } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1072 type = crm_msg_lrmd;
1073 } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1074 type = crm_msg_stonithd;
1075 } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1076 type = crm_msg_stonith_ng;
1077 } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1078 type = crm_msg_attrd;
1079
1080 } else {
1081
1082
1083
1084 int scan_rc = sscanf(text, "%d", &type);
1085
1086 if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1087
1088 type = crm_msg_none;
1089 }
1090 }
1091 return type;
1092 }