This source file includes following definitions.
- pcmk__cpg_local_nodeid
- crm_cs_flush_cb
- crm_cs_flush
- pcmk_cpg_dispatch
- ais_dest
- msg_type2text
- check_message_sanity
- pcmk__cpg_message_data
- cmp_member_list_nodeid
- cpgreason2str
- peer_name
- node_left
- pcmk__cpg_confchg_cb
- pcmk_cpg_set_deliver_fn
- pcmk_cpg_set_confchg_fn
- pcmk__cpg_connect
- pcmk__cpg_disconnect
- send_cpg_text
- pcmk__cpg_send_xml
- cluster_connect_cpg
- cluster_disconnect_cpg
- get_local_nodeid
- pcmk_cpg_membership
- send_cluster_text
- pcmk_message_common_cs
- text2msg_type
1
2
3
4
5
6
7
8
9
10 #include <crm_internal.h>
11
12 #include <arpa/inet.h>
13 #include <inttypes.h>
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdbool.h>
17 #include <stdint.h>
18 #include <sys/socket.h>
19 #include <sys/types.h>
20 #include <sys/utsname.h>
21
22 #include <bzlib.h>
23 #include <corosync/corodefs.h>
24 #include <corosync/corotypes.h>
25 #include <corosync/hdb.h>
26 #include <corosync/cpg.h>
27 #include <qb/qbipc_common.h>
28 #include <qb/qbipcc.h>
29 #include <qb/qbutil.h>
30
31 #include <crm/cluster/internal.h>
32 #include <crm/common/ipc.h>
33 #include <crm/common/ipc_internal.h>
34 #include <crm/common/mainloop.h>
35 #include <crm/common/xml.h>
36
37 #include "crmcluster_private.h"
38
39
40
41
42 static cpg_handle_t pcmk_cpg_handle = 0;
43
44
45 static bool cpg_evicted = false;
46 static GList *cs_message_queue = NULL;
47 static int cs_message_timer = 0;
48
49 struct pcmk__cpg_host_s {
50 uint32_t id;
51 uint32_t pid;
52 gboolean local;
53 enum crm_ais_msg_types type;
54 uint32_t size;
55 char uname[MAX_NAME];
56 } __attribute__ ((packed));
57
58 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
59
60 struct pcmk__cpg_msg_s {
61 struct qb_ipc_response_header header __attribute__ ((aligned(8)));
62 uint32_t id;
63 gboolean is_compressed;
64
65 pcmk__cpg_host_t host;
66 pcmk__cpg_host_t sender;
67
68 uint32_t size;
69 uint32_t compressed_size;
70
71 char data[0];
72
73 } __attribute__ ((packed));
74
75 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
76
77 static void crm_cs_flush(gpointer data);
78
79 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
80
81 #define cs_repeat(rc, counter, max, code) do { \
82 rc = code; \
83 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
84 counter++; \
85 crm_debug("Retrying operation after %ds", counter); \
86 sleep(counter); \
87 } else { \
88 break; \
89 } \
90 } while (counter < max)
91
92
93
94
95
96
97
98
99
100 uint32_t
101 pcmk__cpg_local_nodeid(cpg_handle_t handle)
102 {
103 cs_error_t rc = CS_OK;
104 int retries = 0;
105 static uint32_t local_nodeid = 0;
106 cpg_handle_t local_handle = handle;
107 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
108 int fd = -1;
109 uid_t found_uid = 0;
110 gid_t found_gid = 0;
111 pid_t found_pid = 0;
112 int rv = 0;
113
114 if (local_nodeid != 0) {
115 return local_nodeid;
116 }
117
118 if (handle == 0) {
119 crm_trace("Creating connection");
120 cs_repeat(rc, retries, 5,
121 cpg_model_initialize(&local_handle, CPG_MODEL_V1,
122 (cpg_model_data_t *) &cpg_model_info,
123 NULL));
124 if (rc != CS_OK) {
125 crm_err("Could not connect to the CPG API: %s (%d)",
126 cs_strerror(rc), rc);
127 return 0;
128 }
129
130 rc = cpg_fd_get(local_handle, &fd);
131 if (rc != CS_OK) {
132 crm_err("Could not obtain the CPG API connection: %s (%d)",
133 cs_strerror(rc), rc);
134 goto bail;
135 }
136
137
138 rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
139 &found_uid, &found_gid);
140 if (rv == 0) {
141 crm_err("CPG provider is not authentic:"
142 " process %lld (uid: %lld, gid: %lld)",
143 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
144 (long long) found_uid, (long long) found_gid);
145 goto bail;
146
147 } else if (rv < 0) {
148 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
149 strerror(-rv), -rv);
150 goto bail;
151 }
152 }
153
154 if (rc == CS_OK) {
155 retries = 0;
156 crm_trace("Performing lookup");
157 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
158 }
159
160 if (rc != CS_OK) {
161 crm_err("Could not get local node id from the CPG API: %s (%d)",
162 pcmk__cs_err_str(rc), rc);
163 }
164
165 bail:
166 if (handle == 0) {
167 crm_trace("Closing connection");
168 cpg_finalize(local_handle);
169 }
170 crm_debug("Local nodeid is %u", local_nodeid);
171 return local_nodeid;
172 }
173
174
175
176
177
178
179
180
181
182 static gboolean
183 crm_cs_flush_cb(gpointer data)
184 {
185 cs_message_timer = 0;
186 crm_cs_flush(data);
187 return FALSE;
188 }
189
190
191 #define CS_SEND_MAX 200
192
193
194
195
196
197
198
199 static void
200 crm_cs_flush(gpointer data)
201 {
202 unsigned int sent = 0;
203 guint queue_len = 0;
204 cs_error_t rc = 0;
205 cpg_handle_t *handle = (cpg_handle_t *) data;
206
207 if (*handle == 0) {
208 crm_trace("Connection is dead");
209 return;
210 }
211
212 queue_len = g_list_length(cs_message_queue);
213 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
214 crm_err("CPG queue has grown to %d", queue_len);
215
216 } else if (queue_len == CS_SEND_MAX) {
217 crm_warn("CPG queue has grown to %d", queue_len);
218 }
219
220 if (cs_message_timer != 0) {
221
222 crm_trace("Timer active %d", cs_message_timer);
223 return;
224 }
225
226 while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
227 struct iovec *iov = cs_message_queue->data;
228
229 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
230 if (rc != CS_OK) {
231 break;
232 }
233
234 sent++;
235 crm_trace("CPG message sent, size=%llu",
236 (unsigned long long) iov->iov_len);
237
238 cs_message_queue = g_list_remove(cs_message_queue, iov);
239 free(iov->iov_base);
240 free(iov);
241 }
242
243 queue_len -= sent;
244 do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
245 "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
246 sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
247 (int) rc);
248
249 if (cs_message_queue) {
250 uint32_t delay_ms = 100;
251 if (rc != CS_OK) {
252
253 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
254 }
255 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
256 }
257 }
258
259
260
261
262
263
264
265
266
267 static int
268 pcmk_cpg_dispatch(gpointer user_data)
269 {
270 cs_error_t rc = CS_OK;
271 pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
272
273 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
274 if (rc != CS_OK) {
275 crm_err("Connection to the CPG API failed: %s (%d)",
276 pcmk__cs_err_str(rc), rc);
277 cpg_finalize(cluster->cpg_handle);
278 cluster->cpg_handle = 0;
279 return -1;
280
281 } else if (cpg_evicted) {
282 crm_err("Evicted from CPG membership");
283 return -1;
284 }
285 return 0;
286 }
287
288 static inline const char *
289 ais_dest(const pcmk__cpg_host_t *host)
290 {
291 if (host->local) {
292 return "local";
293 } else if (host->size > 0) {
294 return host->uname;
295 } else {
296 return "<all>";
297 }
298 }
299
300 static inline const char *
301 msg_type2text(enum crm_ais_msg_types type)
302 {
303 const char *text = "unknown";
304
305 switch (type) {
306 case crm_msg_none:
307 text = "unknown";
308 break;
309 case crm_msg_ais:
310 text = "ais";
311 break;
312 case crm_msg_cib:
313 text = "cib";
314 break;
315 case crm_msg_crmd:
316 text = "crmd";
317 break;
318 case crm_msg_pe:
319 text = "pengine";
320 break;
321 case crm_msg_te:
322 text = "tengine";
323 break;
324 case crm_msg_lrmd:
325 text = "lrmd";
326 break;
327 case crm_msg_attrd:
328 text = "attrd";
329 break;
330 case crm_msg_stonithd:
331 text = "stonithd";
332 break;
333 case crm_msg_stonith_ng:
334 text = "stonith-ng";
335 break;
336 }
337 return text;
338 }
339
340
341
342
343
344
345
346
347
348 static bool
349 check_message_sanity(const pcmk__cpg_msg_t *msg)
350 {
351 int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
352
353 if (payload_size < 1) {
354 crm_err("%sCPG message %d from %s invalid: "
355 "Claimed size of %d bytes is too small "
356 CRM_XS " from %s[%u] to %s@%s",
357 (msg->is_compressed? "Compressed " : ""),
358 msg->id, ais_dest(&(msg->sender)),
359 (int) msg->header.size,
360 msg_type2text(msg->sender.type), msg->sender.pid,
361 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
362 return false;
363 }
364
365 if (msg->header.error != CS_OK) {
366 crm_err("%sCPG message %d from %s invalid: "
367 "Sender indicated error %d "
368 CRM_XS " from %s[%u] to %s@%s",
369 (msg->is_compressed? "Compressed " : ""),
370 msg->id, ais_dest(&(msg->sender)),
371 msg->header.error,
372 msg_type2text(msg->sender.type), msg->sender.pid,
373 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
374 return false;
375 }
376
377 if (msg_data_len(msg) != payload_size) {
378 crm_err("%sCPG message %d from %s invalid: "
379 "Total size %d inconsistent with payload size %d "
380 CRM_XS " from %s[%u] to %s@%s",
381 (msg->is_compressed? "Compressed " : ""),
382 msg->id, ais_dest(&(msg->sender)),
383 (int) msg->header.size, (int) msg_data_len(msg),
384 msg_type2text(msg->sender.type), msg->sender.pid,
385 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
386 return false;
387 }
388
389 if (!msg->is_compressed &&
390
391
392
393 (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
394 || (msg->data[msg->size - 1] != '\0'))) {
395 crm_err("CPG message %d from %s invalid: "
396 "Payload does not end at byte %llu "
397 CRM_XS " from %s[%u] to %s@%s",
398 msg->id, ais_dest(&(msg->sender)),
399 (unsigned long long) msg->size,
400 msg_type2text(msg->sender.type), msg->sender.pid,
401 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
402 return false;
403 }
404
405 crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
406 (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
407 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
408 ais_dest(&(msg->sender)),
409 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
410 return true;
411 }
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431 char *
432 pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
433 void *content, uint32_t *kind, const char **from)
434 {
435 char *data = NULL;
436 pcmk__cpg_msg_t *msg = content;
437
438 if (handle != 0) {
439
440 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
441 const char *local_name = pcmk__cluster_local_node_name();
442
443 if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) {
444 crm_err("Nodeid mismatch from %" PRIu32 ".%" PRIu32
445 ": claimed nodeid=%" PRIu32,
446 sender_id, pid, msg->sender.id);
447 return NULL;
448 }
449 if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
450 crm_trace("Not for us: %" PRIu32" != %" PRIu32,
451 msg->host.id, local_nodeid);
452 return NULL;
453 }
454 if ((msg->host.size > 0)
455 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
456
457 crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
458 return NULL;
459 }
460
461 msg->sender.id = sender_id;
462 if (msg->sender.size == 0) {
463 const crm_node_t *peer =
464 pcmk__get_node(sender_id, NULL, NULL,
465 pcmk__node_search_cluster_member);
466
467 if (peer->uname == NULL) {
468 crm_err("No uname for peer with nodeid=%u", sender_id);
469
470 } else {
471 crm_notice("Fixing uname for peer with nodeid=%u", sender_id);
472 msg->sender.size = strlen(peer->uname);
473 memset(msg->sender.uname, 0, MAX_NAME);
474 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
475 }
476 }
477 }
478
479 crm_trace("Got new%s message (size=%d, %d, %d)",
480 msg->is_compressed ? " compressed" : "",
481 msg_data_len(msg), msg->size, msg->compressed_size);
482
483 if (kind != NULL) {
484 *kind = msg->header.id;
485 }
486 if (from != NULL) {
487 *from = msg->sender.uname;
488 }
489
490 if (msg->is_compressed && (msg->size > 0)) {
491 int rc = BZ_OK;
492 char *uncompressed = NULL;
493 unsigned int new_size = msg->size + 1;
494
495 if (!check_message_sanity(msg)) {
496 goto badmsg;
497 }
498
499 crm_trace("Decompressing message data");
500 uncompressed = pcmk__assert_alloc(1, new_size);
501 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
502 msg->compressed_size, 1, 0);
503
504 rc = pcmk__bzlib2rc(rc);
505
506 if (rc != pcmk_rc_ok) {
507 crm_err("Decompression failed: %s " CRM_XS " rc=%d",
508 pcmk_rc_str(rc), rc);
509 free(uncompressed);
510 goto badmsg;
511 }
512
513 pcmk__assert(new_size == msg->size);
514
515 data = uncompressed;
516
517 } else if (!check_message_sanity(msg)) {
518 goto badmsg;
519
520 } else {
521 data = strdup(msg->data);
522 }
523
524
525 pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
526 pcmk__node_search_cluster_member);
527
528 crm_trace("Payload: %.200s", data);
529 return data;
530
531 badmsg:
532 crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
533 " min=%d, total=%d, size=%d, bz2_size=%d",
534 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
535 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
536 msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
537 msg->header.size, msg->size, msg->compressed_size);
538
539 free(data);
540 return NULL;
541 }
542
543
544
545
546
547
548
549
550
551
552
553
554 static int
555 cmp_member_list_nodeid(const void *first, const void *second)
556 {
557 const struct cpg_address *const a = *((const struct cpg_address **) first),
558 *const b = *((const struct cpg_address **) second);
559 if (a->nodeid < b->nodeid) {
560 return -1;
561 } else if (a->nodeid > b->nodeid) {
562 return 1;
563 }
564
565 return 0;
566 }
567
568
569
570
571
572
573
574
575
576 static const char *
577 cpgreason2str(cpg_reason_t reason)
578 {
579 switch (reason) {
580 case CPG_REASON_JOIN: return " via cpg_join";
581 case CPG_REASON_LEAVE: return " via cpg_leave";
582 case CPG_REASON_NODEDOWN: return " via cluster exit";
583 case CPG_REASON_NODEUP: return " via cluster join";
584 case CPG_REASON_PROCDOWN: return " for unknown reason";
585 default: break;
586 }
587 return "";
588 }
589
590
591
592
593
594
595
596
597
598 static inline const char *
599 peer_name(const crm_node_t *peer)
600 {
601 if (peer == NULL) {
602 return "unknown node";
603 } else if (peer->uname == NULL) {
604 return "peer node";
605 } else {
606 return peer->uname;
607 }
608 }
609
610
611
612
613
614
615
616
617
618
619
620
621 static void
622 node_left(const char *cpg_group_name, int event_counter,
623 uint32_t local_nodeid, const struct cpg_address *cpg_peer,
624 const struct cpg_address **sorted_member_list,
625 size_t member_list_entries)
626 {
627 crm_node_t *peer =
628 pcmk__search_node_caches(cpg_peer->nodeid, NULL,
629 pcmk__node_search_cluster_member);
630 const struct cpg_address **rival = NULL;
631
632
633
634
635
636
637
638
639
640
641
642
643
644 if (peer != NULL) {
645 rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
646 sizeof(const struct cpg_address *),
647 cmp_member_list_nodeid);
648 }
649
650 if (rival == NULL) {
651 crm_info("Group %s event %d: %s (node %u pid %u) left%s",
652 cpg_group_name, event_counter, peer_name(peer),
653 cpg_peer->nodeid, cpg_peer->pid,
654 cpgreason2str(cpg_peer->reason));
655 if (peer != NULL) {
656 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
657 PCMK_VALUE_OFFLINE);
658 }
659 } else if (cpg_peer->nodeid == local_nodeid) {
660 crm_warn("Group %s event %d: duplicate local pid %u left%s",
661 cpg_group_name, event_counter,
662 cpg_peer->pid, cpgreason2str(cpg_peer->reason));
663 } else {
664 crm_warn("Group %s event %d: "
665 "%s (node %u) duplicate pid %u left%s (%u remains)",
666 cpg_group_name, event_counter, peer_name(peer),
667 cpg_peer->nodeid, cpg_peer->pid,
668 cpgreason2str(cpg_peer->reason), (*rival)->pid);
669 }
670 }
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688 void
689 pcmk__cpg_confchg_cb(cpg_handle_t handle,
690 const struct cpg_name *group_name,
691 const struct cpg_address *member_list,
692 size_t member_list_entries,
693 const struct cpg_address *left_list,
694 size_t left_list_entries,
695 const struct cpg_address *joined_list,
696 size_t joined_list_entries)
697 {
698 static int counter = 0;
699
700 bool found = false;
701 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
702 const struct cpg_address **sorted = NULL;
703
704 sorted = pcmk__assert_alloc(member_list_entries,
705 sizeof(const struct cpg_address *));
706
707 for (size_t iter = 0; iter < member_list_entries; iter++) {
708 sorted[iter] = member_list + iter;
709 }
710
711
712 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
713 cmp_member_list_nodeid);
714
715 for (int i = 0; i < left_list_entries; i++) {
716 node_left(group_name->value, counter, local_nodeid, &left_list[i],
717 sorted, member_list_entries);
718 }
719 free(sorted);
720 sorted = NULL;
721
722 for (int i = 0; i < joined_list_entries; i++) {
723 crm_info("Group %s event %d: node %u pid %u joined%s",
724 group_name->value, counter, joined_list[i].nodeid,
725 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
726 }
727
728 for (int i = 0; i < member_list_entries; i++) {
729 crm_node_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL,
730 pcmk__node_search_cluster_member);
731
732 if (member_list[i].nodeid == local_nodeid
733 && member_list[i].pid != getpid()) {
734
735 crm_warn("Group %s event %d: detected duplicate local pid %u",
736 group_name->value, counter, member_list[i].pid);
737 continue;
738 }
739 crm_info("Group %s event %d: %s (node %u pid %u) is member",
740 group_name->value, counter, peer_name(peer),
741 member_list[i].nodeid, member_list[i].pid);
742
743
744
745
746 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
747 PCMK_VALUE_ONLINE);
748
749 if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
750
751
752
753
754
755
756
757 time_t now = time(NULL);
758
759 if (peer->when_lost == 0) {
760
761 peer->when_lost = now;
762
763 } else if (now > (peer->when_lost + 60)) {
764
765 crm_warn("Node %u is member of group %s but was believed "
766 "offline",
767 member_list[i].nodeid, group_name->value);
768 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
769 }
770 }
771
772 if (local_nodeid == member_list[i].nodeid) {
773 found = true;
774 }
775 }
776
777 if (!found) {
778 crm_err("Local node was evicted from group %s", group_name->value);
779 cpg_evicted = true;
780 }
781
782 counter++;
783 }
784
785
786
787
788
789
790
791
792
793 int
794 pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
795 {
796 if (cluster == NULL) {
797 return EINVAL;
798 }
799 cluster->cpg.cpg_deliver_fn = fn;
800 return pcmk_rc_ok;
801 }
802
803
804
805
806
807
808
809
810
811 int
812 pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
813 {
814 if (cluster == NULL) {
815 return EINVAL;
816 }
817 cluster->cpg.cpg_confchg_fn = fn;
818 return pcmk_rc_ok;
819 }
820
821
822
823
824
825
826
827
828 int
829 pcmk__cpg_connect(pcmk_cluster_t *cluster)
830 {
831 cs_error_t rc;
832 int fd = -1;
833 int retries = 0;
834 uint32_t id = 0;
835 crm_node_t *peer = NULL;
836 cpg_handle_t handle = 0;
837 const char *message_name = pcmk__message_name(crm_system_name);
838 uid_t found_uid = 0;
839 gid_t found_gid = 0;
840 pid_t found_pid = 0;
841 int rv;
842
843 struct mainloop_fd_callbacks cpg_fd_callbacks = {
844 .dispatch = pcmk_cpg_dispatch,
845 .destroy = cluster->destroy,
846 };
847
848 cpg_model_v1_data_t cpg_model_info = {
849 .model = CPG_MODEL_V1,
850 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
851 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
852 .cpg_totem_confchg_fn = NULL,
853 .flags = 0,
854 };
855
856 cpg_evicted = false;
857 cluster->group.length = 0;
858 cluster->group.value[0] = 0;
859
860
861 strncpy(cluster->group.value, message_name, 127);
862 cluster->group.value[127] = 0;
863 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
864
865 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
866 if (rc != CS_OK) {
867 crm_err("Could not connect to the CPG API: %s (%d)",
868 cs_strerror(rc), rc);
869 goto bail;
870 }
871
872 rc = cpg_fd_get(handle, &fd);
873 if (rc != CS_OK) {
874 crm_err("Could not obtain the CPG API connection: %s (%d)",
875 cs_strerror(rc), rc);
876 goto bail;
877 }
878
879
880 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
881 &found_uid, &found_gid))) {
882 crm_err("CPG provider is not authentic:"
883 " process %lld (uid: %lld, gid: %lld)",
884 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
885 (long long) found_uid, (long long) found_gid);
886 rc = CS_ERR_ACCESS;
887 goto bail;
888 } else if (rv < 0) {
889 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
890 strerror(-rv), -rv);
891 rc = CS_ERR_ACCESS;
892 goto bail;
893 }
894
895 id = pcmk__cpg_local_nodeid(handle);
896 if (id == 0) {
897 crm_err("Could not get local node id from the CPG API");
898 goto bail;
899
900 }
901 cluster->nodeid = id;
902
903 retries = 0;
904 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
905 if (rc != CS_OK) {
906 crm_err("Could not join the CPG group '%s': %d", message_name, rc);
907 goto bail;
908 }
909
910 pcmk_cpg_handle = handle;
911 cluster->cpg_handle = handle;
912 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
913
914 bail:
915 if (rc != CS_OK) {
916 cpg_finalize(handle);
917
918 return ENOTCONN;
919 }
920
921 peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
922 crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
923 return pcmk_rc_ok;
924 }
925
926
927
928
929
930
931
932 void
933 pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
934 {
935 pcmk_cpg_handle = 0;
936 if (cluster->cpg_handle != 0) {
937 crm_trace("Disconnecting CPG");
938 cpg_leave(cluster->cpg_handle, &cluster->group);
939 cpg_finalize(cluster->cpg_handle);
940 cluster->cpg_handle = 0;
941
942 } else {
943 crm_info("No CPG connection");
944 }
945 }
946
947
948
949
950
951
952
953
954
955
956
957
958 static bool
959 send_cpg_text(const char *data, bool local, const crm_node_t *node,
960 enum crm_ais_msg_types dest)
961 {
962
963 static int msg_id = 0;
964 static int local_pid = 0;
965 static int local_name_len = 0;
966 static const char *local_name = NULL;
967
968 char *target = NULL;
969 struct iovec *iov;
970 pcmk__cpg_msg_t *msg = NULL;
971
972 CRM_CHECK(dest != crm_msg_ais, return false);
973
974 if (local_name == NULL) {
975 local_name = pcmk__cluster_local_node_name();
976 }
977 if ((local_name_len == 0) && (local_name != NULL)) {
978 local_name_len = strlen(local_name);
979 }
980
981 if (data == NULL) {
982 data = "";
983 }
984
985 if (local_pid == 0) {
986 local_pid = getpid();
987 }
988
989 msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
990
991 msg_id++;
992 msg->id = msg_id;
993 msg->header.id = crm_class_cluster;
994 msg->header.error = CS_OK;
995
996 msg->host.type = dest;
997 msg->host.local = local;
998
999 if (node != NULL) {
1000 if (node->uname != NULL) {
1001 target = pcmk__str_copy(node->uname);
1002 msg->host.size = strlen(node->uname);
1003 memset(msg->host.uname, 0, MAX_NAME);
1004 memcpy(msg->host.uname, node->uname, msg->host.size);
1005
1006 } else {
1007 target = crm_strdup_printf("%u", node->id);
1008 }
1009 msg->host.id = node->id;
1010
1011 } else {
1012 target = pcmk__str_copy("all");
1013 }
1014
1015 msg->sender.id = 0;
1016 msg->sender.type = pcmk__cluster_parse_msg_type(crm_system_name);
1017 msg->sender.pid = local_pid;
1018 msg->sender.size = local_name_len;
1019 memset(msg->sender.uname, 0, MAX_NAME);
1020
1021 if ((local_name != NULL) && (msg->sender.size != 0)) {
1022 memcpy(msg->sender.uname, local_name, msg->sender.size);
1023 }
1024
1025 msg->size = 1 + strlen(data);
1026 msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
1027
1028 if (msg->size < CRM_BZ2_THRESHOLD) {
1029 msg = pcmk__realloc(msg, msg->header.size);
1030 memcpy(msg->data, data, msg->size);
1031
1032 } else {
1033 char *compressed = NULL;
1034 unsigned int new_size = 0;
1035
1036 if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
1037 &new_size) == pcmk_rc_ok) {
1038
1039 msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
1040 msg = pcmk__realloc(msg, msg->header.size);
1041 memcpy(msg->data, compressed, new_size);
1042
1043 msg->is_compressed = TRUE;
1044 msg->compressed_size = new_size;
1045
1046 } else {
1047
1048
1049 msg = pcmk__realloc(msg, msg->header.size);
1050 memcpy(msg->data, data, msg->size);
1051 }
1052
1053 free(compressed);
1054 }
1055
1056 iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1057 iov->iov_base = msg;
1058 iov->iov_len = msg->header.size;
1059
1060 if (msg->compressed_size > 0) {
1061 crm_trace("Queueing CPG message %u to %s "
1062 "(%llu bytes, %d bytes compressed payload): %.200s",
1063 msg->id, target, (unsigned long long) iov->iov_len,
1064 msg->compressed_size, data);
1065 } else {
1066 crm_trace("Queueing CPG message %u to %s "
1067 "(%llu bytes, %d bytes payload): %.200s",
1068 msg->id, target, (unsigned long long) iov->iov_len,
1069 msg->size, data);
1070 }
1071
1072 free(target);
1073
1074 cs_message_queue = g_list_append(cs_message_queue, iov);
1075 crm_cs_flush(&pcmk_cpg_handle);
1076
1077 return true;
1078 }
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090 bool
1091 pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node,
1092 enum crm_ais_msg_types dest)
1093 {
1094 bool rc = true;
1095 GString *data = g_string_sized_new(1024);
1096
1097 pcmk__xml_string(msg, 0, data, 0);
1098
1099 rc = send_cpg_text(data->str, false, node, dest);
1100 g_string_free(data, TRUE);
1101 return rc;
1102 }
1103
1104
1105
1106
1107 #include <crm/cluster/compat.h>
1108
1109 gboolean
1110 cluster_connect_cpg(pcmk_cluster_t *cluster)
1111 {
1112 return pcmk__cpg_connect(cluster) == pcmk_rc_ok;
1113 }
1114
1115 void
1116 cluster_disconnect_cpg(pcmk_cluster_t *cluster)
1117 {
1118 pcmk__cpg_disconnect(cluster);
1119 }
1120
1121 uint32_t
1122 get_local_nodeid(cpg_handle_t handle)
1123 {
1124 return pcmk__cpg_local_nodeid(handle);
1125 }
1126
1127 void
1128 pcmk_cpg_membership(cpg_handle_t handle,
1129 const struct cpg_name *group_name,
1130 const struct cpg_address *member_list,
1131 size_t member_list_entries,
1132 const struct cpg_address *left_list,
1133 size_t left_list_entries,
1134 const struct cpg_address *joined_list,
1135 size_t joined_list_entries)
1136 {
1137 pcmk__cpg_confchg_cb(handle, group_name, member_list, member_list_entries,
1138 left_list, left_list_entries,
1139 joined_list, joined_list_entries);
1140 }
1141
1142 gboolean
1143 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
1144 gboolean local, const crm_node_t *node,
1145 enum crm_ais_msg_types dest)
1146 {
1147 switch (msg_class) {
1148 case crm_class_cluster:
1149 return send_cpg_text(data, local, node, dest);
1150 default:
1151 crm_err("Invalid message class: %d", msg_class);
1152 return FALSE;
1153 }
1154 }
1155
1156 char *
1157 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid,
1158 void *content, uint32_t *kind, const char **from)
1159 {
1160 return pcmk__cpg_message_data(handle, nodeid, pid, content, kind, from);
1161 }
1162
1163 enum crm_ais_msg_types
1164 text2msg_type(const char *text)
1165 {
1166 int type = crm_msg_none;
1167
1168 CRM_CHECK(text != NULL, return type);
1169 text = pcmk__message_name(text);
1170 if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1171 type = crm_msg_ais;
1172 } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1173 type = crm_msg_cib;
1174 } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1175 type = crm_msg_crmd;
1176 } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1177 type = crm_msg_te;
1178 } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1179 type = crm_msg_pe;
1180 } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1181 type = crm_msg_lrmd;
1182 } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1183 type = crm_msg_stonithd;
1184 } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1185 type = crm_msg_stonith_ng;
1186 } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1187 type = crm_msg_attrd;
1188
1189 } else {
1190
1191
1192
1193 int scan_rc = sscanf(text, "%d", &type);
1194
1195 if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1196
1197 type = crm_msg_none;
1198 }
1199 }
1200 return type;
1201 }
1202
1203
1204