12 #include <sys/socket.h> 13 #include <netinet/in.h> 14 #include <arpa/inet.h> 20 #include <sys/utsname.h> 22 #include <qb/qbipc_common.h> 23 #include <qb/qbipcc.h> 24 #include <qb/qbutil.h> 26 #include <corosync/corodefs.h> 27 #include <corosync/corotypes.h> 28 #include <corosync/hdb.h> 29 #include <corosync/cpg.h> 39 static cpg_handle_t pcmk_cpg_handle = 0;
42 static bool cpg_evicted =
false;
43 static GList *cs_message_queue = NULL;
44 static int cs_message_timer = 0;
46 struct pcmk__cpg_host_s {
57 struct pcmk__cpg_msg_s {
58 struct qb_ipc_response_header header
__attribute__ ((aligned(8)));
74 static void crm_cs_flush(gpointer
data);
76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size) 78 #define cs_repeat(rc, counter, max, code) do { \ 80 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \ 82 crm_debug("Retrying operation after %ds", counter); \ 87 } while (counter < max) 98 if (cluster->cpg_handle) {
100 cpg_leave(cluster->cpg_handle, &cluster->group);
101 cpg_finalize(cluster->cpg_handle);
102 cluster->cpg_handle = 0;
119 cs_error_t
rc = CS_OK;
121 static uint32_t local_nodeid = 0;
122 cpg_handle_t local_handle = handle;
123 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
130 if(local_nodeid != 0) {
136 cs_repeat(
rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
138 crm_err(
"Could not connect to the CPG API: %s (%d)",
139 cs_strerror(
rc),
rc);
143 rc = cpg_fd_get(local_handle, &fd);
145 crm_err(
"Could not obtain the CPG API connection: %s (%d)",
146 cs_strerror(
rc),
rc);
152 &found_uid, &found_gid))) {
153 crm_err(
"CPG provider is not authentic:" 154 " process %lld (uid: %lld, gid: %lld)",
156 (
long long) found_uid, (
long long) found_gid);
159 crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
168 cs_repeat(
rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
172 crm_err(
"Could not get local node id from the CPG API: %s (%d)",
173 pcmk__cs_err_str(
rc),
rc);
179 cpg_finalize(local_handle);
181 crm_debug(
"Local nodeid is %u", local_nodeid);
194 crm_cs_flush_cb(gpointer
data)
196 cs_message_timer = 0;
202 #define CS_SEND_MAX 200 211 crm_cs_flush(gpointer
data)
213 unsigned int sent = 0;
216 cpg_handle_t *handle = (cpg_handle_t *)
data;
223 queue_len = g_list_length(cs_message_queue);
224 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
225 crm_err(
"CPG queue has grown to %d", queue_len);
228 crm_warn(
"CPG queue has grown to %d", queue_len);
231 if (cs_message_timer != 0) {
233 crm_trace(
"Timer active %d", cs_message_timer);
237 while ((cs_message_queue != NULL) && (sent <
CS_SEND_MAX)) {
238 struct iovec *iov = cs_message_queue->data;
240 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
247 (
unsigned long long) iov->iov_len);
249 cs_message_queue = g_list_remove(cs_message_queue, iov);
255 if ((sent > 1) || (cs_message_queue != NULL)) {
256 crm_info(
"Sent %u CPG messages (%d remaining): %s (%d)",
257 sent, queue_len, pcmk__cs_err_str(
rc), (
int)
rc);
259 crm_trace(
"Sent %u CPG messages (%d remaining): %s (%d)",
260 sent, queue_len, pcmk__cs_err_str(
rc), (
int)
rc);
263 if (cs_message_queue) {
264 uint32_t delay_ms = 100;
267 delay_ms = QB_MIN(1000,
CS_SEND_MAX + (10 * queue_len));
269 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb,
data);
282 pcmk_cpg_dispatch(gpointer user_data)
284 cs_error_t
rc = CS_OK;
287 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
289 crm_err(
"Connection to the CPG API failed: %s (%d)",
290 pcmk__cs_err_str(
rc),
rc);
291 cpg_finalize(cluster->cpg_handle);
292 cluster->cpg_handle = 0;
295 }
else if (cpg_evicted) {
296 crm_err(
"Evicted from CPG membership");
302 static inline const char *
307 }
else if (
host->size > 0) {
314 static inline const char *
317 const char *text =
"unknown";
367 if (payload_size < 1) {
368 crm_err(
"%sCPG message %d from %s invalid: " 369 "Claimed size of %d bytes is too small " 370 CRM_XS " from %s[%u] to %s@%s",
371 (msg->is_compressed?
"Compressed " :
""),
372 msg->id, ais_dest(&(msg->sender)),
373 (
int) msg->header.size,
374 msg_type2text(msg->sender.type), msg->sender.pid,
375 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
379 if (msg->header.error != CS_OK) {
380 crm_err(
"%sCPG message %d from %s invalid: " 381 "Sender indicated error %d " 382 CRM_XS " from %s[%u] to %s@%s",
383 (msg->is_compressed?
"Compressed " :
""),
384 msg->id, ais_dest(&(msg->sender)),
386 msg_type2text(msg->sender.type), msg->sender.pid,
387 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
392 crm_err(
"%sCPG message %d from %s invalid: " 393 "Total size %d inconsistent with payload size %d " 394 CRM_XS " from %s[%u] to %s@%s",
395 (msg->is_compressed?
"Compressed " :
""),
396 msg->id, ais_dest(&(msg->sender)),
398 msg_type2text(msg->sender.type), msg->sender.pid,
399 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
403 if (!msg->is_compressed &&
407 (((msg->size > 1) && (msg->data[msg->size - 2] ==
'\0'))
408 || (msg->data[msg->size - 1] !=
'\0'))) {
409 crm_err(
"CPG message %d from %s invalid: " 410 "Payload does not end at byte %llu " 411 CRM_XS " from %s[%u] to %s@%s",
412 msg->id, ais_dest(&(msg->sender)),
413 (
unsigned long long) msg->size,
414 msg_type2text(msg->sender.type), msg->sender.pid,
415 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
419 crm_trace(
"Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
420 (
int) msg->header.size, (msg->is_compressed?
"compressed " :
""),
421 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
422 ais_dest(&(msg->sender)),
423 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
445 uint32_t *kind,
const char **from)
455 if (msg->sender.id > 0 && msg->sender.id != nodeid) {
456 crm_err(
"Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid,
pid, msg->sender.id);
459 }
else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
461 crm_trace(
"Not for us: %u != %u", msg->host.id, local_nodeid);
463 }
else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name,
pcmk__str_casei)) {
465 crm_trace(
"Not for us: %s != %s", msg->host.uname, local_name);
469 msg->sender.id = nodeid;
470 if (msg->sender.size == 0) {
474 crm_err(
"Peer with nodeid=%u is unknown", nodeid);
476 }
else if (peer->
uname == NULL) {
477 crm_err(
"No uname for peer with nodeid=%u", nodeid);
480 crm_notice(
"Fixing uname for peer with nodeid=%u", nodeid);
481 msg->sender.size = strlen(peer->
uname);
482 memset(msg->sender.uname, 0,
MAX_NAME);
483 memcpy(msg->sender.uname, peer->
uname, msg->sender.size);
488 crm_trace(
"Got new%s message (size=%d, %d, %d)",
489 msg->is_compressed ?
" compressed" :
"",
493 *kind = msg->header.id;
496 *from = msg->sender.uname;
499 if (msg->is_compressed && msg->size > 0) {
501 char *uncompressed = NULL;
502 unsigned int new_size = msg->size + 1;
504 if (!check_message_sanity(msg)) {
509 uncompressed = calloc(1, new_size);
510 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
524 }
else if (!check_message_sanity(msg)) {
528 data = strdup(msg->data);
538 crm_err(
"Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" 539 " min=%d, total=%d, size=%d, bz2_size=%d",
540 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
541 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
543 msg->header.size, msg->size, msg->compressed_size);
561 cmp_member_list_nodeid(
const void *first,
const void *second)
563 const struct cpg_address *
const a = *((
const struct cpg_address **) first),
564 *
const b = *((
const struct cpg_address **) second);
565 if (a->nodeid < b->nodeid) {
567 }
else if (a->nodeid > b->nodeid) {
583 cpgreason2str(cpg_reason_t reason)
586 case CPG_REASON_JOIN:
return " via cpg_join";
587 case CPG_REASON_LEAVE:
return " via cpg_leave";
588 case CPG_REASON_NODEDOWN:
return " via cluster exit";
589 case CPG_REASON_NODEUP:
return " via cluster join";
590 case CPG_REASON_PROCDOWN:
return " for unknown reason";
604 static inline const char *
608 return "unknown node";
609 }
else if (peer->
uname == NULL) {
630 const struct cpg_name *groupName,
631 const struct cpg_address *member_list,
size_t member_list_entries,
632 const struct cpg_address *left_list,
size_t left_list_entries,
633 const struct cpg_address *joined_list,
size_t joined_list_entries)
636 gboolean found = FALSE;
637 static int counter = 0;
639 const struct cpg_address *key, **sorted;
641 sorted = malloc(member_list_entries *
sizeof(
const struct cpg_address *));
644 for (
size_t iter = 0; iter < member_list_entries; iter++) {
645 sorted[iter] = member_list + iter;
648 qsort(sorted, member_list_entries,
sizeof(
const struct cpg_address *),
649 cmp_member_list_nodeid);
651 for (i = 0; i < left_list_entries; i++) {
654 const struct cpg_address **rival = NULL;
673 rival = bsearch(&key, sorted, member_list_entries,
674 sizeof(
const struct cpg_address *),
675 cmp_member_list_nodeid);
679 crm_info(
"Group %s event %d: %s (node %u pid %u) left%s",
680 groupName->value, counter, peer_name(peer),
681 left_list[i].nodeid, left_list[i].
pid,
682 cpgreason2str(left_list[i].reason));
687 }
else if (left_list[i].nodeid == local_nodeid) {
688 crm_warn(
"Group %s event %d: duplicate local pid %u left%s",
689 groupName->value, counter,
690 left_list[i].pid, cpgreason2str(left_list[i].reason));
693 "%s (node %u) duplicate pid %u left%s (%u remains)",
694 groupName->value, counter, peer_name(peer),
695 left_list[i].nodeid, left_list[i].
pid,
696 cpgreason2str(left_list[i].reason), (*rival)->pid);
702 for (i = 0; i < joined_list_entries; i++) {
703 crm_info(
"Group %s event %d: node %u pid %u joined%s",
704 groupName->value, counter, joined_list[i].nodeid,
705 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
708 for (i = 0; i < member_list_entries; i++) {
711 if (member_list[i].nodeid == local_nodeid
712 && member_list[i].
pid != getpid()) {
714 crm_warn(
"Group %s event %d: detected duplicate local pid %u",
715 groupName->value, counter, member_list[i].pid);
718 crm_info(
"Group %s event %d: %s (node %u pid %u) is member",
719 groupName->value, counter, peer_name(peer),
720 member_list[i].nodeid, member_list[i].
pid);
736 time_t now = time(NULL);
742 }
else if (now > (peer->
when_lost + 60)) {
744 crm_warn(
"Node %u is member of group %s but was believed offline",
745 member_list[i].nodeid, groupName->value);
750 if (local_nodeid == member_list[i].nodeid) {
756 crm_err(
"Local node was evicted from group %s", groupName->value);
778 cpg_handle_t handle = 0;
790 cpg_model_v1_data_t cpg_model_info = {
791 .model = CPG_MODEL_V1,
792 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
793 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
794 .cpg_totem_confchg_fn = NULL,
799 cluster->group.length = 0;
800 cluster->group.value[0] = 0;
803 strncpy(cluster->group.value, message_name, 127);
804 cluster->group.value[127] = 0;
805 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
807 cs_repeat(
rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
809 crm_err(
"Could not connect to the CPG API: %s (%d)",
810 cs_strerror(
rc),
rc);
814 rc = cpg_fd_get(handle, &fd);
816 crm_err(
"Could not obtain the CPG API connection: %s (%d)",
817 cs_strerror(
rc),
rc);
823 &found_uid, &found_gid))) {
824 crm_err(
"CPG provider is not authentic:" 825 " process %lld (uid: %lld, gid: %lld)",
827 (
long long) found_uid, (
long long) found_gid);
831 crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
839 crm_err(
"Could not get local node id from the CPG API");
846 cs_repeat(
rc, retries, 30, cpg_join(handle, &cluster->group));
848 crm_err(
"Could not join the CPG group '%s': %d", message_name,
rc);
852 pcmk_cpg_handle = handle;
853 cluster->cpg_handle = handle;
858 cpg_finalize(handle);
905 static int msg_id = 0;
906 static int local_pid = 0;
907 static int local_name_len = 0;
908 static const char *local_name = NULL;
919 crm_err(
"Invalid message class: %d", msg_class);
925 if (local_name == NULL) {
928 if ((local_name_len == 0) && (local_name != NULL)) {
929 local_name_len = strlen(local_name);
936 if (local_pid == 0) {
937 local_pid = getpid();
948 msg->header.id = msg_class;
949 msg->header.error = CS_OK;
951 msg->host.type = dest;
952 msg->host.local =
local;
957 msg->host.size = strlen(node->
uname);
958 memset(msg->host.uname, 0,
MAX_NAME);
959 memcpy(msg->host.uname, node->
uname, msg->host.size);
963 msg->host.id = node->
id;
969 msg->sender.type =
sender;
970 msg->sender.pid = local_pid;
971 msg->sender.size = local_name_len;
972 memset(msg->sender.uname, 0,
MAX_NAME);
973 if ((local_name != NULL) && (msg->sender.size != 0)) {
974 memcpy(msg->sender.uname, local_name, msg->sender.size);
977 msg->size = 1 + strlen(
data);
981 msg = pcmk__realloc(msg, msg->header.size);
982 memcpy(msg->data,
data, msg->size);
985 char *compressed = NULL;
986 unsigned int new_size = 0;
987 char *uncompressed = strdup(
data);
993 msg = pcmk__realloc(msg, msg->header.size);
994 memcpy(msg->data, compressed, new_size);
996 msg->is_compressed = TRUE;
997 msg->compressed_size = new_size;
1002 msg = pcmk__realloc(msg, msg->header.size);
1003 memcpy(msg->data,
data, msg->size);
1010 iov = calloc(1,
sizeof(
struct iovec));
1011 iov->iov_base = msg;
1012 iov->iov_len = msg->header.size;
1014 if (msg->compressed_size) {
1015 crm_trace(
"Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1016 msg->id,
target, (
unsigned long long) iov->iov_len,
1017 msg->compressed_size,
data);
1019 crm_trace(
"Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1020 msg->id,
target, (
unsigned long long) iov->iov_len,
1025 cs_message_queue = g_list_append(cs_message_queue, iov);
1026 crm_cs_flush(&pcmk_cpg_handle);
1068 int scan_rc = sscanf(text,
"%d", &
type);
#define CRM_CHECK(expr, failure_action)
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
#define cs_repeat(rc, counter, max, code)
#define crm_notice(fmt, args...)
const char * bz2_strerror(int rc)
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
const char * get_local_node_name(void)
Get the local node's name.
void(* destroy)(gpointer)
#define PCMK__SPECIAL_PID_AS_0(p)
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Get a cluster node cache entry.
enum crm_ais_msg_types type
char * strerror(int errnum)
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Extract text data from a Corosync CPG message.
Wrappers for and extensions to glib mainloop.
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Disconnect from Corosync CPG.
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
struct pcmk__cpg_host_s pcmk__cpg_host_t
#define crm_warn(fmt, args...)
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
#define crm_debug(fmt, args...)
#define crm_trace(fmt, args...)
crm_node_t * pcmk__search_cluster_node_cache(unsigned int id, const char *uname)
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define CRM_SYSTEM_PENGINE
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
gboolean pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
struct pcmk__cpg_host_s __attribute__((packed))
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
#define CRM_SYSTEM_STONITHD
#define CRM_SYSTEM_TENGINE
uint32_t get_local_nodeid(cpg_handle_t handle)
Get the local Corosync node ID (via CPG)
#define crm_err(fmt, args...)
#define G_PRIORITY_MEDIUM
enum crm_ais_msg_types text2msg_type(const char *text)
Get the message type equivalent of a string.
#define CRM_BZ2_THRESHOLD
char * dump_xml_unformatted(xmlNode *msg)
#define msg_data_len(msg)
IPC interface to Pacemaker daemons.
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Handle a CPG configuration change event.
#define crm_info(fmt, args...)
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Connect to Corosync CPG.
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)