This source file includes following definitions.
- cluster_disconnect_cpg
- get_local_nodeid
- crm_cs_flush_cb
- crm_cs_flush
- send_cpg_iov
- pcmk_cpg_dispatch
- pcmk_message_common_cs
- pcmk_cpg_membership
- cluster_connect_cpg
- send_cluster_message_cs
- send_cluster_text
- text2msg_type
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include <crm_internal.h>
20 #include <bzlib.h>
21 #include <sys/socket.h>
22 #include <netinet/in.h>
23 #include <arpa/inet.h>
24 #include <netdb.h>
25
26 #include <crm/common/ipc.h>
27 #include <crm/cluster/internal.h>
28 #include <crm/common/mainloop.h>
29 #include <sys/utsname.h>
30
31 #include <qb/qbipcc.h>
32 #include <qb/qbutil.h>
33
34 #include <corosync/corodefs.h>
35 #include <corosync/corotypes.h>
36 #include <corosync/hdb.h>
37 #include <corosync/cpg.h>
38
39 #include <crm/msg_xml.h>
40
41 cpg_handle_t pcmk_cpg_handle = 0;
42
43 static bool cpg_evicted = FALSE;
44 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
45
46 #define cs_repeat(counter, max, code) do { \
47 code; \
48 if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
49 counter++; \
50 crm_debug("Retrying operation after %ds", counter); \
51 sleep(counter); \
52 } else { \
53 break; \
54 } \
55 } while(counter < max)
56
57 void
58 cluster_disconnect_cpg(crm_cluster_t *cluster)
59 {
60 pcmk_cpg_handle = 0;
61 if (cluster->cpg_handle) {
62 crm_trace("Disconnecting CPG");
63 cpg_leave(cluster->cpg_handle, &cluster->group);
64 cpg_finalize(cluster->cpg_handle);
65 cluster->cpg_handle = 0;
66
67 } else {
68 crm_info("No CPG connection");
69 }
70 }
71
72 uint32_t get_local_nodeid(cpg_handle_t handle)
73 {
74 int rc = CS_OK;
75 int retries = 0;
76 static uint32_t local_nodeid = 0;
77 cpg_handle_t local_handle = handle;
78 cpg_callbacks_t cb = { };
79
80 if(local_nodeid != 0) {
81 return local_nodeid;
82 }
83
84 #if 0
85
86 if(get_cluster_type() == pcmk_cluster_classic_ais) {
87 get_ais_details(&local_nodeid, NULL);
88 goto done;
89 }
90 #endif
91
92 if(handle == 0) {
93 crm_trace("Creating connection");
94 cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
95 }
96
97 if (rc == CS_OK) {
98 retries = 0;
99 crm_trace("Performing lookup");
100 cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
101 }
102
103 if (rc != CS_OK) {
104 crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
105 }
106 if(handle == 0) {
107 crm_trace("Closing connection");
108 cpg_finalize(local_handle);
109 }
110 crm_debug("Local nodeid is %u", local_nodeid);
111 return local_nodeid;
112 }
113
114
115 GListPtr cs_message_queue = NULL;
116 int cs_message_timer = 0;
117
118 static ssize_t crm_cs_flush(gpointer data);
119
120 static gboolean
121 crm_cs_flush_cb(gpointer data)
122 {
123 cs_message_timer = 0;
124 crm_cs_flush(data);
125 return FALSE;
126 }
127
128 #define CS_SEND_MAX 200
129 static ssize_t
130 crm_cs_flush(gpointer data)
131 {
132 int sent = 0;
133 ssize_t rc = 0;
134 int queue_len = 0;
135 static unsigned int last_sent = 0;
136 cpg_handle_t *handle = (cpg_handle_t *)data;
137
138 if (*handle == 0) {
139 crm_trace("Connection is dead");
140 return pcmk_ok;
141 }
142
143 queue_len = g_list_length(cs_message_queue);
144 if ((queue_len % 1000) == 0 && queue_len > 1) {
145 crm_err("CPG queue has grown to %d", queue_len);
146
147 } else if (queue_len == CS_SEND_MAX) {
148 crm_warn("CPG queue has grown to %d", queue_len);
149 }
150
151 if (cs_message_timer) {
152
153 crm_trace("Timer active %d", cs_message_timer);
154 return pcmk_ok;
155 }
156
157 while (cs_message_queue && sent < CS_SEND_MAX) {
158 struct iovec *iov = cs_message_queue->data;
159
160 errno = 0;
161 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
162
163 if (rc != CS_OK) {
164 break;
165 }
166
167 sent++;
168 last_sent++;
169 crm_trace("CPG message sent, size=%llu",
170 (unsigned long long) iov->iov_len);
171
172 cs_message_queue = g_list_remove(cs_message_queue, iov);
173 free(iov->iov_base);
174 free(iov);
175 }
176
177 queue_len -= sent;
178 if (sent > 1 || cs_message_queue) {
179 crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
180 sent, queue_len, last_sent, ais_error2text(rc),
181 (long long) rc);
182 } else {
183 crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
184 sent, queue_len, last_sent, ais_error2text(rc),
185 (long long) rc);
186 }
187
188 if (cs_message_queue) {
189 uint32_t delay_ms = 100;
190 if(rc != CS_OK) {
191
192 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
193 }
194 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
195 }
196
197 return rc;
198 }
199
200 gboolean
201 send_cpg_iov(struct iovec * iov)
202 {
203 static unsigned int queued = 0;
204
205 queued++;
206 crm_trace("Queueing CPG message %u (%llu bytes)",
207 queued, (unsigned long long) iov->iov_len);
208 cs_message_queue = g_list_append(cs_message_queue, iov);
209 crm_cs_flush(&pcmk_cpg_handle);
210 return TRUE;
211 }
212
213 static int
214 pcmk_cpg_dispatch(gpointer user_data)
215 {
216 int rc = 0;
217 crm_cluster_t *cluster = (crm_cluster_t*) user_data;
218
219 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
220 if (rc != CS_OK) {
221 crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
222 cluster->cpg_handle = 0;
223 return -1;
224
225 } else if(cpg_evicted) {
226 crm_err("Evicted from CPG membership");
227 return -1;
228 }
229 return 0;
230 }
231
232 char *
233 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
234 uint32_t *kind, const char **from)
235 {
236 char *data = NULL;
237 AIS_Message *msg = (AIS_Message *) content;
238
239 if(handle) {
240
241
242
243 uint32_t local_nodeid = get_local_nodeid(handle);
244 const char *local_name = get_local_node_name();
245
246 if (msg->sender.id > 0 && msg->sender.id != nodeid) {
247 crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
248 return NULL;
249
250 } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
251
252 crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
253 return NULL;
254 } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
255
256 crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
257 return NULL;
258 }
259
260 msg->sender.id = nodeid;
261 if (msg->sender.size == 0) {
262 crm_node_t *peer = crm_get_peer(nodeid, NULL);
263
264 if (peer == NULL) {
265 crm_err("Peer with nodeid=%u is unknown", nodeid);
266
267 } else if (peer->uname == NULL) {
268 crm_err("No uname for peer with nodeid=%u", nodeid);
269
270 } else {
271 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
272 msg->sender.size = strlen(peer->uname);
273 memset(msg->sender.uname, 0, MAX_NAME);
274 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
275 }
276 }
277 }
278
279 crm_trace("Got new%s message (size=%d, %d, %d)",
280 msg->is_compressed ? " compressed" : "",
281 ais_data_len(msg), msg->size, msg->compressed_size);
282
283 if (kind != NULL) {
284 *kind = msg->header.id;
285 }
286 if (from != NULL) {
287 *from = msg->sender.uname;
288 }
289
290 if (msg->is_compressed && msg->size > 0) {
291 int rc = BZ_OK;
292 char *uncompressed = NULL;
293 unsigned int new_size = msg->size + 1;
294
295 if (check_message_sanity(msg, NULL) == FALSE) {
296 goto badmsg;
297 }
298
299 crm_trace("Decompressing message data");
300 uncompressed = calloc(1, new_size);
301 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
302
303 if (rc != BZ_OK) {
304 crm_err("Decompression failed: %d", rc);
305 free(uncompressed);
306 goto badmsg;
307 }
308
309 CRM_ASSERT(rc == BZ_OK);
310 CRM_ASSERT(new_size == msg->size);
311
312 data = uncompressed;
313
314 } else if (check_message_sanity(msg, data) == FALSE) {
315 goto badmsg;
316
317 } else if (safe_str_eq("identify", data)) {
318 char *pid_s = crm_getpid_s();
319
320 send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
321 free(pid_s);
322 return NULL;
323
324 } else {
325 data = strdup(msg->data);
326 }
327
328 if (msg->header.id != crm_class_members) {
329
330 crm_get_peer(msg->sender.id, msg->sender.uname);
331 }
332
333 if (msg->header.id == crm_class_rmpeer) {
334 uint32_t id = crm_int_helper(data, NULL);
335
336 crm_info("Removing peer %s/%u", data, id);
337 reap_crm_member(id, NULL);
338 free(data);
339 return NULL;
340
341 #if SUPPORT_PLUGIN
342 } else if (is_classic_ais_cluster()) {
343 plugin_handle_membership(msg);
344 #endif
345 }
346
347 crm_trace("Payload: %.200s", data);
348 return data;
349
350 badmsg:
351 crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
352 " min=%d, total=%d, size=%d, bz2_size=%d",
353 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
354 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
355 msg->sender.pid, (int)sizeof(AIS_Message),
356 msg->header.size, msg->size, msg->compressed_size);
357
358 free(data);
359 return NULL;
360 }
361
362 void
363 pcmk_cpg_membership(cpg_handle_t handle,
364 const struct cpg_name *groupName,
365 const struct cpg_address *member_list, size_t member_list_entries,
366 const struct cpg_address *left_list, size_t left_list_entries,
367 const struct cpg_address *joined_list, size_t joined_list_entries)
368 {
369 int i;
370 gboolean found = FALSE;
371 static int counter = 0;
372 uint32_t local_nodeid = get_local_nodeid(handle);
373
374 for (i = 0; i < left_list_entries; i++) {
375 crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
376
377 crm_info("Node %u left group %s (peer=%s, counter=%d.%d)",
378 left_list[i].nodeid, groupName->value,
379 (peer? peer->uname : "<none>"), counter, i);
380 if (peer) {
381 crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
382 }
383 }
384
385 for (i = 0; i < joined_list_entries; i++) {
386 crm_info("Node %u joined group %s (counter=%d.%d)",
387 joined_list[i].nodeid, groupName->value, counter, i);
388 }
389
390 for (i = 0; i < member_list_entries; i++) {
391 crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
392
393 crm_info("Node %u still member of group %s (peer=%s, counter=%d.%d)",
394 member_list[i].nodeid, groupName->value,
395 (peer? peer->uname : "<none>"), counter, i);
396
397
398
399
400
401 peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
402 if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
403 time_t now = time(NULL);
404
405
406 if(peer->votes == 0) {
407 peer->votes = now;
408
409 } else if(now > (60 + peer->votes)) {
410
411
412
413
414
415 crm_err("Node %s[%u] appears to be online even though we think it is dead", peer->uname, peer->id);
416 if (crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0)) {
417 peer->votes = 0;
418 }
419 }
420 }
421
422 if (local_nodeid == member_list[i].nodeid) {
423 found = TRUE;
424 }
425 }
426
427 if (!found) {
428 crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
429 cpg_evicted = TRUE;
430 }
431
432 counter++;
433 }
434
435 gboolean
436 cluster_connect_cpg(crm_cluster_t *cluster)
437 {
438 int rc = -1;
439 int fd = 0;
440 int retries = 0;
441 uint32_t id = 0;
442 crm_node_t *peer = NULL;
443 cpg_handle_t handle = 0;
444
445 struct mainloop_fd_callbacks cpg_fd_callbacks = {
446 .dispatch = pcmk_cpg_dispatch,
447 .destroy = cluster->destroy,
448 };
449
450 cpg_callbacks_t cpg_callbacks = {
451 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
452 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
453
454
455 };
456
457 cpg_evicted = FALSE;
458 cluster->group.length = 0;
459 cluster->group.value[0] = 0;
460
461
462 strncpy(cluster->group.value, crm_system_name?crm_system_name:"unknown", 127);
463 cluster->group.value[127] = 0;
464 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
465
466 cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
467 if (rc != CS_OK) {
468 crm_err("Could not connect to the Cluster Process Group API: %d", rc);
469 goto bail;
470 }
471
472 id = get_local_nodeid(handle);
473 if (id == 0) {
474 crm_err("Could not get local node id from the CPG API");
475 goto bail;
476
477 }
478 cluster->nodeid = id;
479
480 retries = 0;
481 cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
482 if (rc != CS_OK) {
483 crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
484 goto bail;
485 }
486
487 rc = cpg_fd_get(handle, &fd);
488 if (rc != CS_OK) {
489 crm_err("Could not obtain the CPG API connection: %d", rc);
490 goto bail;
491 }
492
493 pcmk_cpg_handle = handle;
494 cluster->cpg_handle = handle;
495 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
496
497 bail:
498 if (rc != CS_OK) {
499 cpg_finalize(handle);
500 return FALSE;
501 }
502
503 peer = crm_get_peer(id, NULL);
504 crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
505 return TRUE;
506 }
507
508 gboolean
509 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
510 {
511 gboolean rc = TRUE;
512 char *data = NULL;
513
514 data = dump_xml_unformatted(msg);
515 rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
516 free(data);
517 return rc;
518 }
519
520 gboolean
521 send_cluster_text(int class, const char *data,
522 gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
523 {
524 static int msg_id = 0;
525 static int local_pid = 0;
526 static int local_name_len = 0;
527 static const char *local_name = NULL;
528
529 char *target = NULL;
530 struct iovec *iov;
531 AIS_Message *msg = NULL;
532 enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
533
534
535 CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
536 return FALSE);
537
538 #if !SUPPORT_PLUGIN
539 CRM_CHECK(dest != crm_msg_ais, return FALSE);
540 #endif
541
542 if(local_name == NULL) {
543 local_name = get_local_node_name();
544 }
545 if(local_name_len == 0 && local_name) {
546 local_name_len = strlen(local_name);
547 }
548
549 if (data == NULL) {
550 data = "";
551 }
552
553 if (local_pid == 0) {
554 local_pid = getpid();
555 }
556
557 if (sender == crm_msg_none) {
558 sender = local_pid;
559 }
560
561 msg = calloc(1, sizeof(AIS_Message));
562
563 msg_id++;
564 msg->id = msg_id;
565 msg->header.id = class;
566 msg->header.error = CS_OK;
567
568 msg->host.type = dest;
569 msg->host.local = local;
570
571 if (node) {
572 if (node->uname) {
573 target = strdup(node->uname);
574 msg->host.size = strlen(node->uname);
575 memset(msg->host.uname, 0, MAX_NAME);
576 memcpy(msg->host.uname, node->uname, msg->host.size);
577 } else {
578 target = crm_strdup_printf("%u", node->id);
579 }
580 msg->host.id = node->id;
581 } else {
582 target = strdup("all");
583 }
584
585 msg->sender.id = 0;
586 msg->sender.type = sender;
587 msg->sender.pid = local_pid;
588 msg->sender.size = local_name_len;
589 memset(msg->sender.uname, 0, MAX_NAME);
590 if(local_name && msg->sender.size) {
591 memcpy(msg->sender.uname, local_name, msg->sender.size);
592 }
593
594 msg->size = 1 + strlen(data);
595 msg->header.size = sizeof(AIS_Message) + msg->size;
596
597 if (msg->size < CRM_BZ2_THRESHOLD) {
598 msg = realloc_safe(msg, msg->header.size);
599 memcpy(msg->data, data, msg->size);
600
601 } else {
602 char *compressed = NULL;
603 unsigned int new_size = 0;
604 char *uncompressed = strdup(data);
605
606 if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
607
608 msg->header.size = sizeof(AIS_Message) + new_size;
609 msg = realloc_safe(msg, msg->header.size);
610 memcpy(msg->data, compressed, new_size);
611
612 msg->is_compressed = TRUE;
613 msg->compressed_size = new_size;
614
615 } else {
616 msg = realloc_safe(msg, msg->header.size);
617 memcpy(msg->data, data, msg->size);
618 }
619
620 free(uncompressed);
621 free(compressed);
622 }
623
624 iov = calloc(1, sizeof(struct iovec));
625 iov->iov_base = msg;
626 iov->iov_len = msg->header.size;
627
628 if (msg->compressed_size) {
629 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
630 msg->id, target, (unsigned long long) iov->iov_len,
631 msg->compressed_size, data);
632 } else {
633 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
634 msg->id, target, (unsigned long long) iov->iov_len,
635 msg->size, data);
636 }
637 free(target);
638
639 #if SUPPORT_PLUGIN
640
641 if(get_cluster_type() == pcmk_cluster_classic_ais) {
642 return send_plugin_text(class, iov);
643 }
644 #endif
645
646 send_cpg_iov(iov);
647
648 return TRUE;
649 }
650
651 enum crm_ais_msg_types
652 text2msg_type(const char *text)
653 {
654 int type = crm_msg_none;
655
656 CRM_CHECK(text != NULL, return type);
657 if (safe_str_eq(text, "ais")) {
658 type = crm_msg_ais;
659 } else if (safe_str_eq(text, "crm_plugin")) {
660 type = crm_msg_ais;
661 } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
662 type = crm_msg_cib;
663 } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
664 type = crm_msg_crmd;
665 } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
666 type = crm_msg_crmd;
667 } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
668 type = crm_msg_te;
669 } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
670 type = crm_msg_pe;
671 } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
672 type = crm_msg_lrmd;
673 } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
674 type = crm_msg_stonithd;
675 } else if (safe_str_eq(text, "stonith-ng")) {
676 type = crm_msg_stonith_ng;
677 } else if (safe_str_eq(text, "attrd")) {
678 type = crm_msg_attrd;
679
680 } else {
681
682
683
684 int scan_rc = sscanf(text, "%d", &type);
685
686 if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
687
688 type = crm_msg_none;
689 }
690 }
691 return type;
692 }