This source file includes following definitions.
- cluster_disconnect_cpg
- get_local_nodeid
- crm_cs_flush_cb
- crm_cs_flush
- send_cpg_iov
- pcmk_cpg_dispatch
- pcmk_message_common_cs
- cmp_member_list_nodeid
- cpgreason2str
- peer_name
- pcmk_cpg_membership
- cluster_connect_cpg
- send_cluster_message_cs
- send_cluster_text
- text2msg_type
1
2
3
4
5
6
7
8
9
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21
22 #include <qb/qbipcc.h>
23 #include <qb/qbutil.h>
24
25 #include <corosync/corodefs.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/hdb.h>
28 #include <corosync/cpg.h>
29
30 #include <crm/msg_xml.h>
31
32 #include <crm/common/ipc_internal.h>
33
34 cpg_handle_t pcmk_cpg_handle = 0;
35
36 static bool cpg_evicted = FALSE;
37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
38
39 #define cs_repeat(counter, max, code) do { \
40 code; \
41 if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
42 counter++; \
43 crm_debug("Retrying operation after %ds", counter); \
44 sleep(counter); \
45 } else { \
46 break; \
47 } \
48 } while(counter < max)
49
50 void
51 cluster_disconnect_cpg(crm_cluster_t *cluster)
52 {
53 pcmk_cpg_handle = 0;
54 if (cluster->cpg_handle) {
55 crm_trace("Disconnecting CPG");
56 cpg_leave(cluster->cpg_handle, &cluster->group);
57 cpg_finalize(cluster->cpg_handle);
58 cluster->cpg_handle = 0;
59
60 } else {
61 crm_info("No CPG connection");
62 }
63 }
64
65 uint32_t get_local_nodeid(cpg_handle_t handle)
66 {
67 cs_error_t rc = CS_OK;
68 int retries = 0;
69 static uint32_t local_nodeid = 0;
70 cpg_handle_t local_handle = handle;
71 cpg_callbacks_t cb = { };
72 int fd = -1;
73 uid_t found_uid = 0;
74 gid_t found_gid = 0;
75 pid_t found_pid = 0;
76 int rv;
77
78 if(local_nodeid != 0) {
79 return local_nodeid;
80 }
81
82 if(handle == 0) {
83 crm_trace("Creating connection");
84 cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
85 if (rc != CS_OK) {
86 crm_err("Could not connect to the CPG API: %s (%d)",
87 cs_strerror(rc), rc);
88 return 0;
89 }
90
91 rc = cpg_fd_get(local_handle, &fd);
92 if (rc != CS_OK) {
93 crm_err("Could not obtain the CPG API connection: %s (%d)",
94 cs_strerror(rc), rc);
95 goto bail;
96 }
97
98
99 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
100 &found_uid, &found_gid))) {
101 crm_err("CPG provider is not authentic:"
102 " process %lld (uid: %lld, gid: %lld)",
103 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
104 (long long) found_uid, (long long) found_gid);
105 goto bail;
106 } else if (rv < 0) {
107 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
108 strerror(-rv), -rv);
109 goto bail;
110 }
111 }
112
113 if (rc == CS_OK) {
114 retries = 0;
115 crm_trace("Performing lookup");
116 cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
117 }
118
119 if (rc != CS_OK) {
120 crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
121 }
122
123 bail:
124 if(handle == 0) {
125 crm_trace("Closing connection");
126 cpg_finalize(local_handle);
127 }
128 crm_debug("Local nodeid is %u", local_nodeid);
129 return local_nodeid;
130 }
131
132
133 GListPtr cs_message_queue = NULL;
134 int cs_message_timer = 0;
135
136 static ssize_t crm_cs_flush(gpointer data);
137
138 static gboolean
139 crm_cs_flush_cb(gpointer data)
140 {
141 cs_message_timer = 0;
142 crm_cs_flush(data);
143 return FALSE;
144 }
145
146 #define CS_SEND_MAX 200
147 static ssize_t
148 crm_cs_flush(gpointer data)
149 {
150 int sent = 0;
151 ssize_t rc = 0;
152 int queue_len = 0;
153 static unsigned int last_sent = 0;
154 cpg_handle_t *handle = (cpg_handle_t *)data;
155
156 if (*handle == 0) {
157 crm_trace("Connection is dead");
158 return pcmk_ok;
159 }
160
161 queue_len = g_list_length(cs_message_queue);
162 if ((queue_len % 1000) == 0 && queue_len > 1) {
163 crm_err("CPG queue has grown to %d", queue_len);
164
165 } else if (queue_len == CS_SEND_MAX) {
166 crm_warn("CPG queue has grown to %d", queue_len);
167 }
168
169 if (cs_message_timer) {
170
171 crm_trace("Timer active %d", cs_message_timer);
172 return pcmk_ok;
173 }
174
175 while (cs_message_queue && sent < CS_SEND_MAX) {
176 struct iovec *iov = cs_message_queue->data;
177
178 errno = 0;
179 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
180
181 if (rc != CS_OK) {
182 break;
183 }
184
185 sent++;
186 last_sent++;
187 crm_trace("CPG message sent, size=%llu",
188 (unsigned long long) iov->iov_len);
189
190 cs_message_queue = g_list_remove(cs_message_queue, iov);
191 free(iov->iov_base);
192 free(iov);
193 }
194
195 queue_len -= sent;
196 if (sent > 1 || cs_message_queue) {
197 crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
198 sent, queue_len, last_sent, ais_error2text(rc),
199 (long long) rc);
200 } else {
201 crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
202 sent, queue_len, last_sent, ais_error2text(rc),
203 (long long) rc);
204 }
205
206 if (cs_message_queue) {
207 uint32_t delay_ms = 100;
208 if(rc != CS_OK) {
209
210 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
211 }
212 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
213 }
214
215 return rc;
216 }
217
218 gboolean
219 send_cpg_iov(struct iovec * iov)
220 {
221 static unsigned int queued = 0;
222
223 queued++;
224 crm_trace("Queueing CPG message %u (%llu bytes)",
225 queued, (unsigned long long) iov->iov_len);
226 cs_message_queue = g_list_append(cs_message_queue, iov);
227 crm_cs_flush(&pcmk_cpg_handle);
228 return TRUE;
229 }
230
231 static int
232 pcmk_cpg_dispatch(gpointer user_data)
233 {
234 int rc = 0;
235 crm_cluster_t *cluster = (crm_cluster_t*) user_data;
236
237 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
238 if (rc != CS_OK) {
239 crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
240 cpg_finalize(cluster->cpg_handle);
241 cluster->cpg_handle = 0;
242 return -1;
243
244 } else if(cpg_evicted) {
245 crm_err("Evicted from CPG membership");
246 return -1;
247 }
248 return 0;
249 }
250
251 char *
252 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
253 uint32_t *kind, const char **from)
254 {
255 char *data = NULL;
256 AIS_Message *msg = (AIS_Message *) content;
257
258 if(handle) {
259
260 uint32_t local_nodeid = get_local_nodeid(handle);
261 const char *local_name = get_local_node_name();
262
263 if (msg->sender.id > 0 && msg->sender.id != nodeid) {
264 crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
265 return NULL;
266
267 } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
268
269 crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
270 return NULL;
271 } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
272
273 crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
274 return NULL;
275 }
276
277 msg->sender.id = nodeid;
278 if (msg->sender.size == 0) {
279 crm_node_t *peer = crm_get_peer(nodeid, NULL);
280
281 if (peer == NULL) {
282 crm_err("Peer with nodeid=%u is unknown", nodeid);
283
284 } else if (peer->uname == NULL) {
285 crm_err("No uname for peer with nodeid=%u", nodeid);
286
287 } else {
288 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
289 msg->sender.size = strlen(peer->uname);
290 memset(msg->sender.uname, 0, MAX_NAME);
291 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
292 }
293 }
294 }
295
296 crm_trace("Got new%s message (size=%d, %d, %d)",
297 msg->is_compressed ? " compressed" : "",
298 ais_data_len(msg), msg->size, msg->compressed_size);
299
300 if (kind != NULL) {
301 *kind = msg->header.id;
302 }
303 if (from != NULL) {
304 *from = msg->sender.uname;
305 }
306
307 if (msg->is_compressed && msg->size > 0) {
308 int rc = BZ_OK;
309 char *uncompressed = NULL;
310 unsigned int new_size = msg->size + 1;
311
312 if (check_message_sanity(msg, NULL) == FALSE) {
313 goto badmsg;
314 }
315
316 crm_trace("Decompressing message data");
317 uncompressed = calloc(1, new_size);
318 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
319
320 if (rc != BZ_OK) {
321 crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
322 bz2_strerror(rc), rc);
323 free(uncompressed);
324 goto badmsg;
325 }
326
327 CRM_ASSERT(rc == BZ_OK);
328 CRM_ASSERT(new_size == msg->size);
329
330 data = uncompressed;
331
332 } else if (check_message_sanity(msg, data) == FALSE) {
333 goto badmsg;
334
335 } else if (pcmk__str_eq("identify", data, pcmk__str_casei)) {
336 char *pid_s = pcmk__getpid_s();
337
338 send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
339 free(pid_s);
340 return NULL;
341
342 } else {
343 data = strdup(msg->data);
344 }
345
346
347 crm_get_peer(msg->sender.id, msg->sender.uname);
348
349 crm_trace("Payload: %.200s", data);
350 return data;
351
352 badmsg:
353 crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
354 " min=%d, total=%d, size=%d, bz2_size=%d",
355 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
356 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
357 msg->sender.pid, (int)sizeof(AIS_Message),
358 msg->header.size, msg->size, msg->compressed_size);
359
360 free(data);
361 return NULL;
362 }
363
364 static int cmp_member_list_nodeid(const void *first,
365 const void *second)
366 {
367 const struct cpg_address *const a = *((const struct cpg_address **) first),
368 *const b = *((const struct cpg_address **) second);
369 if (a->nodeid < b->nodeid) {
370 return -1;
371 } else if (a->nodeid > b->nodeid) {
372 return 1;
373 }
374
375 return 0;
376 }
377
378 static const char *
379 cpgreason2str(cpg_reason_t reason)
380 {
381 switch (reason) {
382 case CPG_REASON_JOIN: return " via cpg_join";
383 case CPG_REASON_LEAVE: return " via cpg_leave";
384 case CPG_REASON_NODEDOWN: return " via cluster exit";
385 case CPG_REASON_NODEUP: return " via cluster join";
386 case CPG_REASON_PROCDOWN: return " for unknown reason";
387 default: break;
388 }
389 return "";
390 }
391
392 static inline const char *
393 peer_name(crm_node_t *peer)
394 {
395 if (peer == NULL) {
396 return "unknown node";
397 } else if (peer->uname == NULL) {
398 return "peer node";
399 } else {
400 return peer->uname;
401 }
402 }
403
404 void
405 pcmk_cpg_membership(cpg_handle_t handle,
406 const struct cpg_name *groupName,
407 const struct cpg_address *member_list, size_t member_list_entries,
408 const struct cpg_address *left_list, size_t left_list_entries,
409 const struct cpg_address *joined_list, size_t joined_list_entries)
410 {
411 int i;
412 gboolean found = FALSE;
413 static int counter = 0;
414 uint32_t local_nodeid = get_local_nodeid(handle);
415 const struct cpg_address *key, **sorted;
416
417 sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
418 CRM_ASSERT(sorted != NULL);
419
420 for (size_t iter = 0; iter < member_list_entries; iter++) {
421 sorted[iter] = member_list + iter;
422 }
423
424 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
425 cmp_member_list_nodeid);
426
427 for (i = 0; i < left_list_entries; i++) {
428 crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
429 const struct cpg_address **rival = NULL;
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446 if (peer) {
447 key = &left_list[i];
448 rival = bsearch(&key, sorted, member_list_entries,
449 sizeof(const struct cpg_address *),
450 cmp_member_list_nodeid);
451 }
452
453 if (rival == NULL) {
454 crm_info("Group %s event %d: %s (node %u pid %u) left%s",
455 groupName->value, counter, peer_name(peer),
456 left_list[i].nodeid, left_list[i].pid,
457 cpgreason2str(left_list[i].reason));
458 if (peer) {
459 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
460 OFFLINESTATUS);
461 }
462 } else if (left_list[i].nodeid == local_nodeid) {
463 crm_warn("Group %s event %d: duplicate local pid %u left%s",
464 groupName->value, counter,
465 left_list[i].pid, cpgreason2str(left_list[i].reason));
466 } else {
467 crm_warn("Group %s event %d: "
468 "%s (node %u) duplicate pid %u left%s (%u remains)",
469 groupName->value, counter, peer_name(peer),
470 left_list[i].nodeid, left_list[i].pid,
471 cpgreason2str(left_list[i].reason), (*rival)->pid);
472 }
473 }
474 free(sorted);
475 sorted = NULL;
476
477 for (i = 0; i < joined_list_entries; i++) {
478 crm_info("Group %s event %d: node %u pid %u joined%s",
479 groupName->value, counter, joined_list[i].nodeid,
480 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
481 }
482
483 for (i = 0; i < member_list_entries; i++) {
484 crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
485
486 if (member_list[i].nodeid == local_nodeid
487 && member_list[i].pid != getpid()) {
488
489 crm_warn("Group %s event %d: detected duplicate local pid %u",
490 groupName->value, counter, member_list[i].pid);
491 continue;
492 }
493 crm_info("Group %s event %d: %s (node %u pid %u) is member",
494 groupName->value, counter, peer_name(peer),
495 member_list[i].nodeid, member_list[i].pid);
496
497
498
499
500 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
501 ONLINESTATUS);
502
503 if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
504
505
506
507
508
509
510
511 time_t now = time(NULL);
512
513 if (peer->when_lost == 0) {
514
515 peer->when_lost = now;
516
517 } else if (now > (peer->when_lost + 60)) {
518
519 crm_warn("Node %u is member of group %s but was believed offline",
520 member_list[i].nodeid, groupName->value);
521 crm_update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
522 }
523 }
524
525 if (local_nodeid == member_list[i].nodeid) {
526 found = TRUE;
527 }
528 }
529
530 if (!found) {
531 crm_err("Local node was evicted from group %s", groupName->value);
532 cpg_evicted = TRUE;
533 }
534
535 counter++;
536 }
537
538 gboolean
539 cluster_connect_cpg(crm_cluster_t *cluster)
540 {
541 cs_error_t rc;
542 int fd = -1;
543 int retries = 0;
544 uint32_t id = 0;
545 crm_node_t *peer = NULL;
546 cpg_handle_t handle = 0;
547 const char *message_name = pcmk__message_name(crm_system_name);
548 uid_t found_uid = 0;
549 gid_t found_gid = 0;
550 pid_t found_pid = 0;
551 int rv;
552
553 struct mainloop_fd_callbacks cpg_fd_callbacks = {
554 .dispatch = pcmk_cpg_dispatch,
555 .destroy = cluster->destroy,
556 };
557
558 cpg_callbacks_t cpg_callbacks = {
559 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
560 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
561
562
563 };
564
565 cpg_evicted = FALSE;
566 cluster->group.length = 0;
567 cluster->group.value[0] = 0;
568
569
570 strncpy(cluster->group.value, message_name, 127);
571 cluster->group.value[127] = 0;
572 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
573
574 cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
575 if (rc != CS_OK) {
576 crm_err("Could not connect to the CPG API: %s (%d)",
577 cs_strerror(rc), rc);
578 goto bail;
579 }
580
581 rc = cpg_fd_get(handle, &fd);
582 if (rc != CS_OK) {
583 crm_err("Could not obtain the CPG API connection: %s (%d)",
584 cs_strerror(rc), rc);
585 goto bail;
586 }
587
588
589 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
590 &found_uid, &found_gid))) {
591 crm_err("CPG provider is not authentic:"
592 " process %lld (uid: %lld, gid: %lld)",
593 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
594 (long long) found_uid, (long long) found_gid);
595 rc = CS_ERR_ACCESS;
596 goto bail;
597 } else if (rv < 0) {
598 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
599 strerror(-rv), -rv);
600 rc = CS_ERR_ACCESS;
601 goto bail;
602 }
603
604 id = get_local_nodeid(handle);
605 if (id == 0) {
606 crm_err("Could not get local node id from the CPG API");
607 goto bail;
608
609 }
610 cluster->nodeid = id;
611
612 retries = 0;
613 cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
614 if (rc != CS_OK) {
615 crm_err("Could not join the CPG group '%s': %d", message_name, rc);
616 goto bail;
617 }
618
619 pcmk_cpg_handle = handle;
620 cluster->cpg_handle = handle;
621 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
622
623 bail:
624 if (rc != CS_OK) {
625 cpg_finalize(handle);
626 return FALSE;
627 }
628
629 peer = crm_get_peer(id, NULL);
630 crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
631 return TRUE;
632 }
633
634 gboolean
635 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
636 {
637 gboolean rc = TRUE;
638 char *data = NULL;
639
640 data = dump_xml_unformatted(msg);
641 rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
642 free(data);
643 return rc;
644 }
645
646 gboolean
647 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
648 gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
649 {
650 static int msg_id = 0;
651 static int local_pid = 0;
652 static int local_name_len = 0;
653 static const char *local_name = NULL;
654
655 char *target = NULL;
656 struct iovec *iov;
657 AIS_Message *msg = NULL;
658 enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
659
660 switch (msg_class) {
661 case crm_class_cluster:
662 break;
663 default:
664 crm_err("Invalid message class: %d", msg_class);
665 return FALSE;
666 }
667
668 CRM_CHECK(dest != crm_msg_ais, return FALSE);
669
670 if(local_name == NULL) {
671 local_name = get_local_node_name();
672 }
673 if(local_name_len == 0 && local_name) {
674 local_name_len = strlen(local_name);
675 }
676
677 if (data == NULL) {
678 data = "";
679 }
680
681 if (local_pid == 0) {
682 local_pid = getpid();
683 }
684
685 if (sender == crm_msg_none) {
686 sender = local_pid;
687 }
688
689 msg = calloc(1, sizeof(AIS_Message));
690
691 msg_id++;
692 msg->id = msg_id;
693 msg->header.id = msg_class;
694 msg->header.error = CS_OK;
695
696 msg->host.type = dest;
697 msg->host.local = local;
698
699 if (node) {
700 if (node->uname) {
701 target = strdup(node->uname);
702 msg->host.size = strlen(node->uname);
703 memset(msg->host.uname, 0, MAX_NAME);
704 memcpy(msg->host.uname, node->uname, msg->host.size);
705 } else {
706 target = crm_strdup_printf("%u", node->id);
707 }
708 msg->host.id = node->id;
709 } else {
710 target = strdup("all");
711 }
712
713 msg->sender.id = 0;
714 msg->sender.type = sender;
715 msg->sender.pid = local_pid;
716 msg->sender.size = local_name_len;
717 memset(msg->sender.uname, 0, MAX_NAME);
718 if(local_name && msg->sender.size) {
719 memcpy(msg->sender.uname, local_name, msg->sender.size);
720 }
721
722 msg->size = 1 + strlen(data);
723 msg->header.size = sizeof(AIS_Message) + msg->size;
724
725 if (msg->size < CRM_BZ2_THRESHOLD) {
726 msg = pcmk__realloc(msg, msg->header.size);
727 memcpy(msg->data, data, msg->size);
728
729 } else {
730 char *compressed = NULL;
731 unsigned int new_size = 0;
732 char *uncompressed = strdup(data);
733
734 if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
735 &compressed, &new_size) == pcmk_rc_ok) {
736
737 msg->header.size = sizeof(AIS_Message) + new_size;
738 msg = pcmk__realloc(msg, msg->header.size);
739 memcpy(msg->data, compressed, new_size);
740
741 msg->is_compressed = TRUE;
742 msg->compressed_size = new_size;
743
744 } else {
745
746
747 msg = pcmk__realloc(msg, msg->header.size);
748 memcpy(msg->data, data, msg->size);
749 }
750
751 free(uncompressed);
752 free(compressed);
753 }
754
755 iov = calloc(1, sizeof(struct iovec));
756 iov->iov_base = msg;
757 iov->iov_len = msg->header.size;
758
759 if (msg->compressed_size) {
760 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
761 msg->id, target, (unsigned long long) iov->iov_len,
762 msg->compressed_size, data);
763 } else {
764 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
765 msg->id, target, (unsigned long long) iov->iov_len,
766 msg->size, data);
767 }
768 free(target);
769
770 send_cpg_iov(iov);
771
772 return TRUE;
773 }
774
775 enum crm_ais_msg_types
776 text2msg_type(const char *text)
777 {
778 int type = crm_msg_none;
779
780 CRM_CHECK(text != NULL, return type);
781 text = pcmk__message_name(text);
782 if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
783 type = crm_msg_ais;
784 } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
785 type = crm_msg_cib;
786 } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
787 type = crm_msg_crmd;
788 } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
789 type = crm_msg_te;
790 } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
791 type = crm_msg_pe;
792 } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
793 type = crm_msg_lrmd;
794 } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
795 type = crm_msg_stonithd;
796 } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
797 type = crm_msg_stonith_ng;
798 } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
799 type = crm_msg_attrd;
800
801 } else {
802
803
804
805 int scan_rc = sscanf(text, "%d", &type);
806
807 if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
808
809 type = crm_msg_none;
810 }
811 }
812 return type;
813 }