12 #include <arpa/inet.h> 15 #include <netinet/in.h> 18 #include <sys/socket.h> 19 #include <sys/types.h> 20 #include <sys/utsname.h> 23 #include <corosync/corodefs.h> 24 #include <corosync/corotypes.h> 25 #include <corosync/hdb.h> 26 #include <corosync/cpg.h> 27 #include <qb/qbipc_common.h> 28 #include <qb/qbipcc.h> 29 #include <qb/qbutil.h> 42 static cpg_handle_t pcmk_cpg_handle = 0;
45 static bool cpg_evicted =
false;
46 static GList *cs_message_queue = NULL;
47 static int cs_message_timer = 0;
49 struct pcmk__cpg_host_s {
60 struct pcmk__cpg_msg_s {
61 struct qb_ipc_response_header header
__attribute__ ((aligned(8)));
77 static void crm_cs_flush(gpointer
data);
79 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size) 81 #define cs_repeat(rc, counter, max, code) do { \ 83 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \ 85 crm_debug("Retrying operation after %ds", counter); \ 90 } while (counter < max) 103 cs_error_t rc = CS_OK;
105 static uint32_t local_nodeid = 0;
106 cpg_handle_t local_handle = handle;
107 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
114 if (local_nodeid != 0) {
121 cpg_model_initialize(&local_handle, CPG_MODEL_V1,
122 (cpg_model_data_t *) &cpg_model_info,
125 crm_err(
"Could not connect to the CPG API: %s (%d)",
126 cs_strerror(rc), rc);
130 rc = cpg_fd_get(local_handle, &fd);
132 crm_err(
"Could not obtain the CPG API connection: %s (%d)",
133 cs_strerror(rc), rc);
139 &found_uid, &found_gid);
141 crm_err(
"CPG provider is not authentic:" 142 " process %lld (uid: %lld, gid: %lld)",
144 (
long long) found_uid, (
long long) found_gid);
148 crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
157 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
161 crm_err(
"Could not get local node id from the CPG API: %s (%d)",
162 pcmk__cs_err_str(rc), rc);
168 cpg_finalize(local_handle);
170 crm_debug(
"Local nodeid is %u", local_nodeid);
183 crm_cs_flush_cb(gpointer
data)
185 cs_message_timer = 0;
191 #define CS_SEND_MAX 200 200 crm_cs_flush(gpointer
data)
202 unsigned int sent = 0;
205 cpg_handle_t *handle = (cpg_handle_t *)
data;
212 queue_len = g_list_length(cs_message_queue);
213 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
214 crm_err(
"CPG queue has grown to %d", queue_len);
217 crm_warn(
"CPG queue has grown to %d", queue_len);
220 if (cs_message_timer != 0) {
222 crm_trace(
"Timer active %d", cs_message_timer);
226 while ((cs_message_queue != NULL) && (sent <
CS_SEND_MAX)) {
227 struct iovec *iov = cs_message_queue->data;
229 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
236 (
unsigned long long) iov->iov_len);
238 cs_message_queue = g_list_remove(cs_message_queue, iov);
245 "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
249 if (cs_message_queue) {
250 uint32_t delay_ms = 100;
253 delay_ms = QB_MIN(1000,
CS_SEND_MAX + (10 * queue_len));
255 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb,
data);
268 pcmk_cpg_dispatch(gpointer user_data)
270 cs_error_t rc = CS_OK;
273 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
275 crm_err(
"Connection to the CPG API failed: %s (%d)",
276 pcmk__cs_err_str(rc), rc);
277 cpg_finalize(cluster->cpg_handle);
278 cluster->cpg_handle = 0;
281 }
else if (cpg_evicted) {
282 crm_err(
"Evicted from CPG membership");
288 static inline const char *
293 }
else if (
host->size > 0) {
300 static inline const char *
303 const char *text =
"unknown";
353 if (payload_size < 1) {
354 crm_err(
"%sCPG message %d from %s invalid: " 355 "Claimed size of %d bytes is too small " 356 CRM_XS " from %s[%u] to %s@%s",
357 (msg->is_compressed?
"Compressed " :
""),
358 msg->id, ais_dest(&(msg->sender)),
359 (
int) msg->header.size,
360 msg_type2text(msg->sender.type), msg->sender.pid,
361 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
365 if (msg->header.error != CS_OK) {
366 crm_err(
"%sCPG message %d from %s invalid: " 367 "Sender indicated error %d " 368 CRM_XS " from %s[%u] to %s@%s",
369 (msg->is_compressed?
"Compressed " :
""),
370 msg->id, ais_dest(&(msg->sender)),
372 msg_type2text(msg->sender.type), msg->sender.pid,
373 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
378 crm_err(
"%sCPG message %d from %s invalid: " 379 "Total size %d inconsistent with payload size %d " 380 CRM_XS " from %s[%u] to %s@%s",
381 (msg->is_compressed?
"Compressed " :
""),
382 msg->id, ais_dest(&(msg->sender)),
384 msg_type2text(msg->sender.type), msg->sender.pid,
385 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
389 if (!msg->is_compressed &&
393 (((msg->size > 1) && (msg->data[msg->size - 2] ==
'\0'))
394 || (msg->data[msg->size - 1] !=
'\0'))) {
395 crm_err(
"CPG message %d from %s invalid: " 396 "Payload does not end at byte %llu " 397 CRM_XS " from %s[%u] to %s@%s",
398 msg->id, ais_dest(&(msg->sender)),
399 (
unsigned long long) msg->size,
400 msg_type2text(msg->sender.type), msg->sender.pid,
401 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
405 crm_trace(
"Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
406 (
int) msg->header.size, (msg->is_compressed?
"compressed " :
""),
407 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
408 ais_dest(&(msg->sender)),
409 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
433 void *content, uint32_t *kind,
const char **from)
443 if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) {
444 crm_err(
"Nodeid mismatch from %" PRIu32
".%" PRIu32
445 ": claimed nodeid=%" PRIu32,
446 sender_id,
pid, msg->sender.id);
449 if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
450 crm_trace(
"Not for us: %" PRIu32
" != %" PRIu32,
451 msg->host.id, local_nodeid);
454 if ((msg->host.size > 0)
457 crm_trace(
"Not for us: %s != %s", msg->host.uname, local_name);
461 msg->sender.id = sender_id;
462 if (msg->sender.size == 0) {
467 if (peer->
uname == NULL) {
468 crm_err(
"No uname for peer with nodeid=%u", sender_id);
471 crm_notice(
"Fixing uname for peer with nodeid=%u", sender_id);
472 msg->sender.size = strlen(peer->
uname);
473 memset(msg->sender.uname, 0,
MAX_NAME);
474 memcpy(msg->sender.uname, peer->
uname, msg->sender.size);
479 crm_trace(
"Got new%s message (size=%d, %d, %d)",
480 msg->is_compressed ?
" compressed" :
"",
484 *kind = msg->header.id;
487 *from = msg->sender.uname;
490 if (msg->is_compressed && (msg->size > 0)) {
492 char *uncompressed = NULL;
493 unsigned int new_size = msg->size + 1;
495 if (!check_message_sanity(msg)) {
501 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
502 msg->compressed_size, 1, 0);
517 }
else if (!check_message_sanity(msg)) {
521 data = strdup(msg->data);
532 crm_err(
"Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" 533 " min=%d, total=%d, size=%d, bz2_size=%d",
534 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
535 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
537 msg->header.size, msg->size, msg->compressed_size);
555 cmp_member_list_nodeid(
const void *first,
const void *second)
557 const struct cpg_address *
const a = *((
const struct cpg_address **) first),
558 *
const b = *((
const struct cpg_address **) second);
559 if (a->nodeid < b->nodeid) {
561 }
else if (a->nodeid > b->nodeid) {
577 cpgreason2str(cpg_reason_t reason)
580 case CPG_REASON_JOIN:
return " via cpg_join";
581 case CPG_REASON_LEAVE:
return " via cpg_leave";
582 case CPG_REASON_NODEDOWN:
return " via cluster exit";
583 case CPG_REASON_NODEUP:
return " via cluster join";
584 case CPG_REASON_PROCDOWN:
return " for unknown reason";
598 static inline const char *
602 return "unknown node";
603 }
else if (peer->
uname == NULL) {
622 node_left(
const char *cpg_group_name,
int event_counter,
623 uint32_t local_nodeid,
const struct cpg_address *cpg_peer,
624 const struct cpg_address **sorted_member_list,
625 size_t member_list_entries)
630 const struct cpg_address **rival = NULL;
645 rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
646 sizeof(
const struct cpg_address *),
647 cmp_member_list_nodeid);
651 crm_info(
"Group %s event %d: %s (node %u pid %u) left%s",
652 cpg_group_name, event_counter, peer_name(peer),
653 cpg_peer->nodeid, cpg_peer->pid,
654 cpgreason2str(cpg_peer->reason));
659 }
else if (cpg_peer->nodeid == local_nodeid) {
660 crm_warn(
"Group %s event %d: duplicate local pid %u left%s",
661 cpg_group_name, event_counter,
662 cpg_peer->pid, cpgreason2str(cpg_peer->reason));
665 "%s (node %u) duplicate pid %u left%s (%u remains)",
666 cpg_group_name, event_counter, peer_name(peer),
667 cpg_peer->nodeid, cpg_peer->pid,
668 cpgreason2str(cpg_peer->reason), (*rival)->pid);
690 const struct cpg_name *group_name,
691 const struct cpg_address *member_list,
692 size_t member_list_entries,
693 const struct cpg_address *left_list,
694 size_t left_list_entries,
695 const struct cpg_address *joined_list,
696 size_t joined_list_entries)
698 static int counter = 0;
702 const struct cpg_address **sorted = NULL;
705 sizeof(
const struct cpg_address *));
707 for (
size_t iter = 0; iter < member_list_entries; iter++) {
708 sorted[iter] = member_list + iter;
712 qsort(sorted, member_list_entries,
sizeof(
const struct cpg_address *),
713 cmp_member_list_nodeid);
715 for (
int i = 0; i < left_list_entries; i++) {
716 node_left(group_name->value, counter, local_nodeid, &left_list[i],
717 sorted, member_list_entries);
722 for (
int i = 0; i < joined_list_entries; i++) {
723 crm_info(
"Group %s event %d: node %u pid %u joined%s",
724 group_name->value, counter, joined_list[i].nodeid,
725 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
728 for (
int i = 0; i < member_list_entries; i++) {
732 if (member_list[i].nodeid == local_nodeid
733 && member_list[i].
pid != getpid()) {
735 crm_warn(
"Group %s event %d: detected duplicate local pid %u",
736 group_name->value, counter, member_list[i].pid);
739 crm_info(
"Group %s event %d: %s (node %u pid %u) is member",
740 group_name->value, counter, peer_name(peer),
741 member_list[i].nodeid, member_list[i].
pid);
757 time_t now = time(NULL);
763 }
else if (now > (peer->
when_lost + 60)) {
765 crm_warn(
"Node %u is member of group %s but was believed " 767 member_list[i].nodeid, group_name->value);
772 if (local_nodeid == member_list[i].nodeid) {
778 crm_err(
"Local node was evicted from group %s", group_name->value);
796 if (cluster == NULL) {
799 cluster->cpg.cpg_deliver_fn = fn;
814 if (cluster == NULL) {
817 cluster->cpg.cpg_confchg_fn = fn;
836 cpg_handle_t handle = 0;
848 cpg_model_v1_data_t cpg_model_info = {
849 .model = CPG_MODEL_V1,
850 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
851 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
852 .cpg_totem_confchg_fn = NULL,
857 cluster->group.length = 0;
858 cluster->group.value[0] = 0;
861 strncpy(cluster->group.value, message_name, 127);
862 cluster->group.value[127] = 0;
863 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
865 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
867 crm_err(
"Could not connect to the CPG API: %s (%d)",
868 cs_strerror(rc), rc);
872 rc = cpg_fd_get(handle, &fd);
874 crm_err(
"Could not obtain the CPG API connection: %s (%d)",
875 cs_strerror(rc), rc);
881 &found_uid, &found_gid))) {
882 crm_err(
"CPG provider is not authentic:" 883 " process %lld (uid: %lld, gid: %lld)",
885 (
long long) found_uid, (
long long) found_gid);
889 crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
897 crm_err(
"Could not get local node id from the CPG API");
904 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
906 crm_err(
"Could not join the CPG group '%s': %d", message_name, rc);
910 pcmk_cpg_handle = handle;
911 cluster->cpg_handle = handle;
916 cpg_finalize(handle);
936 if (cluster->cpg_handle != 0) {
938 cpg_leave(cluster->cpg_handle, &cluster->group);
939 cpg_finalize(cluster->cpg_handle);
940 cluster->cpg_handle = 0;
963 static int msg_id = 0;
964 static int local_pid = 0;
965 static int local_name_len = 0;
966 static const char *local_name = NULL;
974 if (local_name == NULL) {
977 if ((local_name_len == 0) && (local_name != NULL)) {
978 local_name_len = strlen(local_name);
985 if (local_pid == 0) {
986 local_pid = getpid();
994 msg->header.error = CS_OK;
996 msg->host.type = dest;
997 msg->host.local =
local;
1000 if (node->
uname != NULL) {
1002 msg->host.size = strlen(node->
uname);
1003 memset(msg->host.uname, 0,
MAX_NAME);
1004 memcpy(msg->host.uname, node->
uname, msg->host.size);
1009 msg->host.id = node->
id;
1017 msg->sender.pid = local_pid;
1018 msg->sender.size = local_name_len;
1019 memset(msg->sender.uname, 0,
MAX_NAME);
1021 if ((local_name != NULL) && (msg->sender.size != 0)) {
1022 memcpy(msg->sender.uname, local_name, msg->sender.size);
1025 msg->size = 1 + strlen(
data);
1029 msg = pcmk__realloc(msg, msg->header.size);
1030 memcpy(msg->data,
data, msg->size);
1033 char *compressed = NULL;
1034 unsigned int new_size = 0;
1040 msg = pcmk__realloc(msg, msg->header.size);
1041 memcpy(msg->data, compressed, new_size);
1043 msg->is_compressed = TRUE;
1044 msg->compressed_size = new_size;
1049 msg = pcmk__realloc(msg, msg->header.size);
1050 memcpy(msg->data,
data, msg->size);
1057 iov->iov_base = msg;
1058 iov->iov_len = msg->header.size;
1060 if (msg->compressed_size > 0) {
1061 crm_trace(
"Queueing CPG message %u to %s " 1062 "(%llu bytes, %d bytes compressed payload): %.200s",
1063 msg->id,
target, (
unsigned long long) iov->iov_len,
1064 msg->compressed_size,
data);
1066 crm_trace(
"Queueing CPG message %u to %s " 1067 "(%llu bytes, %d bytes payload): %.200s",
1068 msg->id,
target, (
unsigned long long) iov->iov_len,
1074 cs_message_queue = g_list_append(cs_message_queue, iov);
1075 crm_cs_flush(&pcmk_cpg_handle);
1095 GString *
data = g_string_sized_new(1024);
1099 rc = send_cpg_text(
data->str,
false, node, dest);
1100 g_string_free(
data, TRUE);
1129 const struct cpg_name *group_name,
1130 const struct cpg_address *member_list,
1131 size_t member_list_entries,
1132 const struct cpg_address *left_list,
1133 size_t left_list_entries,
1134 const struct cpg_address *joined_list,
1135 size_t joined_list_entries)
1138 left_list, left_list_entries,
1139 joined_list, joined_list_entries);
1147 switch (msg_class) {
1149 return send_cpg_text(
data,
local, node, dest);
1151 crm_err(
"Invalid message class: %d", msg_class);
1158 void *content, uint32_t *kind,
const char **from)
1193 int scan_rc = sscanf(text,
"%d", &
type);
#define CRM_CHECK(expr, failure_action)
#define cs_repeat(rc, counter, max, code)
#define crm_notice(fmt, args...)
uint32_t pcmk__cpg_local_nodeid(cpg_handle_t handle)
crm_node_t * pcmk__get_node(unsigned int id, const char *uname, const char *uuid, uint32_t flags)
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
#define CRM_BZ2_THRESHOLD
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Search for cluster nodes from membership cache.
void(* destroy)(gpointer)
#define PCMK__SPECIAL_PID_AS_0(p)
bool pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node, enum crm_ais_msg_types dest)
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
const char * pcmk__cluster_local_node_name(void)
void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
enum crm_ais_msg_types type
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Wrappers for and extensions to glib mainloop.
int pcmk__cpg_connect(pcmk_cluster_t *cluster)
Connect to Corosync CPG.
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
struct pcmk__cpg_host_s pcmk__cpg_host_t
#define crm_warn(fmt, args...)
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
#define crm_debug(fmt, args...)
char * pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, uint32_t *kind, const char **from)
#define crm_trace(fmt, args...)
#define do_crm_log(level, fmt, args...)
Log a message.
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, const crm_node_t *node, enum crm_ais_msg_types dest)
#define CRM_SYSTEM_PENGINE
int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
Set the CPG deliver callback function for a cluster object.
Wrappers for and extensions to libxml2.
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
Set the CPG config change callback function for a cluster object.
#define pcmk__str_copy(str)
struct pcmk__cpg_host_s __attribute__((packed))
Deprecated Pacemaker cluster API.
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
#define pcmk__assert(expr)
#define PCMK_VALUE_ONLINE
gboolean cluster_connect_cpg(pcmk_cluster_t *cluster)
#define CRM_SYSTEM_STONITHD
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
#define CRM_SYSTEM_TENGINE
void pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
uint32_t get_local_nodeid(cpg_handle_t handle)
crm_node_t * pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags)
#define crm_err(fmt, args...)
enum crm_ais_msg_types pcmk__cluster_parse_msg_type(const char *text)
#define G_PRIORITY_MEDIUM
enum crm_ais_msg_types text2msg_type(const char *text)
#define msg_data_len(msg)
#define pcmk__plural_s(i)
void cluster_disconnect_cpg(pcmk_cluster_t *cluster)
IPC interface to Pacemaker daemons.
#define pcmk__assert_alloc(nmemb, size)
#define PCMK_VALUE_OFFLINE
#define crm_info(fmt, args...)
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)