15#include <netinet/in.h>
18#include <sys/socket.h>
20#include <sys/utsname.h>
23#include <corosync/corodefs.h>
24#include <corosync/corotypes.h>
25#include <corosync/hdb.h>
26#include <corosync/cpg.h>
27#include <qb/qbipc_common.h>
42static cpg_handle_t pcmk_cpg_handle = 0;
45static bool cpg_evicted =
false;
46static GList *cs_message_queue = NULL;
47static int cs_message_timer = 0;
54struct pcmk__cpg_host_s {
65struct pcmk__cpg_msg_s {
66 struct qb_ipc_response_header header
__attribute__ ((aligned(8)));
82static void crm_cs_flush(gpointer
data);
84#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
86#define cs_repeat(rc, counter, max, code) do { \
88 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
90 crm_debug("Retrying operation after %ds", counter); \
95 } while (counter < max)
108 cs_error_t rc = CS_OK;
110 static uint32_t local_nodeid = 0;
111 cpg_handle_t local_handle = handle;
112 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
119 if (local_nodeid != 0) {
126 cpg_model_initialize(&local_handle, CPG_MODEL_V1,
127 (cpg_model_data_t *) &cpg_model_info,
130 crm_err(
"Could not connect to the CPG API: %s (%d)",
131 cs_strerror(rc), rc);
135 rc = cpg_fd_get(local_handle, &fd);
137 crm_err(
"Could not obtain the CPG API connection: %s (%d)",
138 cs_strerror(rc), rc);
144 &found_uid, &found_gid);
146 crm_err(
"CPG provider is not authentic:"
147 " process %lld (uid: %lld, gid: %lld)",
149 (
long long) found_uid, (
long long) found_gid);
153 crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
162 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
166 crm_err(
"Could not get local node id from the CPG API: %s (%d)",
167 pcmk__cs_err_str(rc), rc);
173 cpg_finalize(local_handle);
175 crm_debug(
"Local nodeid is %u", local_nodeid);
188crm_cs_flush_cb(gpointer
data)
190 cs_message_timer = 0;
196#define CS_SEND_MAX 200
205crm_cs_flush(gpointer
data)
207 unsigned int sent = 0;
210 cpg_handle_t *handle = (cpg_handle_t *)
data;
217 queue_len = g_list_length(cs_message_queue);
218 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
219 crm_err(
"CPG queue has grown to %d", queue_len);
222 crm_warn(
"CPG queue has grown to %d", queue_len);
225 if (cs_message_timer != 0) {
227 crm_trace(
"Timer active %d", cs_message_timer);
231 while ((cs_message_queue != NULL) && (sent <
CS_SEND_MAX)) {
232 struct iovec *iov = cs_message_queue->data;
234 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
240 crm_trace(
"CPG message sent, size=%zu", iov->iov_len);
242 cs_message_queue = g_list_remove(cs_message_queue, iov);
249 "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
253 if (cs_message_queue) {
254 uint32_t delay_ms = 100;
257 delay_ms = QB_MIN(1000,
CS_SEND_MAX + (10 * queue_len));
272pcmk_cpg_dispatch(gpointer user_data)
274 cs_error_t rc = CS_OK;
277 rc = cpg_dispatch(cluster->
priv->cpg_handle, CS_DISPATCH_ONE);
279 crm_err(
"Connection to the CPG API failed: %s (%d)",
280 pcmk__cs_err_str(rc), rc);
281 cpg_finalize(cluster->
priv->cpg_handle);
282 cluster->
priv->cpg_handle = 0;
285 }
else if (cpg_evicted) {
286 crm_err(
"Evicted from CPG membership");
292static inline const char *
295 return (
host->size > 0)?
host->uname :
"<all>";
298static inline const char *
303 return pcmk__s(
name,
"unknown");
319 if (payload_size < 1) {
320 crm_err(
"%sCPG message %d from %s invalid: "
321 "Claimed size of %d bytes is too small "
322 QB_XS
" from %s[%u] to %s@%s",
323 (msg->is_compressed?
"Compressed " :
""),
324 msg->id, ais_dest(&(msg->sender)),
325 (
int) msg->header.size,
326 msg_type2text(msg->sender.type), msg->sender.pid,
327 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
331 if (msg->header.error != CS_OK) {
332 crm_err(
"%sCPG message %d from %s invalid: "
333 "Sender indicated error %d "
334 QB_XS
" from %s[%u] to %s@%s",
335 (msg->is_compressed?
"Compressed " :
""),
336 msg->id, ais_dest(&(msg->sender)),
338 msg_type2text(msg->sender.type), msg->sender.pid,
339 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
344 crm_err(
"%sCPG message %d from %s invalid: "
345 "Total size %d inconsistent with payload size %d "
346 QB_XS
" from %s[%u] to %s@%s",
347 (msg->is_compressed?
"Compressed " :
""),
348 msg->id, ais_dest(&(msg->sender)),
350 msg_type2text(msg->sender.type), msg->sender.pid,
351 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
355 if (!msg->is_compressed &&
359 (((msg->size > 1) && (msg->data[msg->size - 2] ==
'\0'))
360 || (msg->data[msg->size - 1] !=
'\0'))) {
361 crm_err(
"CPG message %d from %s invalid: "
362 "Payload does not end at byte %" PRIu32
" "
363 QB_XS
" from %s[%u] to %s@%s",
364 msg->id, ais_dest(&(msg->sender)), msg->size,
365 msg_type2text(msg->sender.type), msg->sender.pid,
366 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
370 crm_trace(
"Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
371 (
int) msg->header.size, (msg->is_compressed?
"compressed " :
""),
372 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
373 ais_dest(&(msg->sender)),
374 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
396 void *content,
const char **from)
410 if (msg->sender.id == 0) {
411 msg->sender.id = sender_id;
412 }
else if (msg->sender.id != sender_id) {
413 crm_warn(
"Ignoring CPG message from ID %" PRIu32
" PID %" PRIu32
414 ": claimed ID %" PRIu32,
415 sender_id,
pid, msg->sender.id);
420 if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
421 crm_trace(
"Ignoring CPG message from ID %" PRIu32
" PID %" PRIu32
422 ": for ID %" PRIu32
" not %" PRIu32,
423 sender_id,
pid, msg->host.id, local_nodeid);
426 if ((msg->host.size > 0)
429 crm_trace(
"Ignoring CPG message from ID %" PRIu32
" PID %" PRIu32
430 ": for name %s not %s",
431 sender_id,
pid, msg->host.uname, local_name);
436 if (msg->sender.size == 0) {
441 if (peer->
name == NULL) {
442 crm_debug(
"Received CPG message from node with ID %" PRIu32
443 " but its name is unknown", sender_id);
445 crm_debug(
"Updating name of CPG message sender with ID %" PRIu32
446 " to %s", sender_id, peer->
name);
447 msg->sender.size = strlen(peer->
name);
448 memset(msg->sender.uname, 0,
MAX_NAME);
449 memcpy(msg->sender.uname, peer->
name, msg->sender.size);
459 *from = msg->sender.uname;
462 if (!check_message_sanity(msg)) {
466 if (msg->is_compressed && (msg->size > 0)) {
468 unsigned int new_size = msg->size + 1;
471 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
472 msg->compressed_size, 1, 0);
474 if ((rc ==
pcmk_rc_ok) && (msg->size != new_size)) {
479 crm_warn(
"Ignoring compressed CPG message %d from %s (ID %" PRIu32
480 " PID %" PRIu32
"): %s",
481 msg->id, ais_dest(&(msg->sender)), sender_id,
pid,
491 crm_trace(
"Received %sCPG message %d from %s (ID %" PRIu32
492 " PID %" PRIu32
"): %.40s...",
493 (msg->is_compressed?
"compressed " :
""),
494 msg->id, ais_dest(&(msg->sender)), sender_id,
pid, msg->data);
510cmp_member_list_nodeid(
const void *first,
const void *second)
512 const struct cpg_address *
const a = *((
const struct cpg_address **) first),
513 *
const b = *((
const struct cpg_address **) second);
514 if (a->nodeid < b->nodeid) {
516 }
else if (a->nodeid > b->nodeid) {
532cpgreason2str(cpg_reason_t reason)
535 case CPG_REASON_JOIN:
return " via cpg_join";
536 case CPG_REASON_LEAVE:
return " via cpg_leave";
537 case CPG_REASON_NODEDOWN:
return " via cluster exit";
538 case CPG_REASON_NODEUP:
return " via cluster join";
539 case CPG_REASON_PROCDOWN:
return " for unknown reason";
553static inline const char *
556 return (peer != NULL)? pcmk__s(peer->
name,
"peer node") :
"unknown node";
571node_left(
const char *cpg_group_name,
int event_counter,
572 uint32_t local_nodeid,
const struct cpg_address *cpg_peer,
573 const struct cpg_address **sorted_member_list,
574 size_t member_list_entries)
579 const struct cpg_address **rival = NULL;
594 rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
595 sizeof(
const struct cpg_address *),
596 cmp_member_list_nodeid);
600 crm_info(
"Group %s event %d: %s (node %u pid %u) left%s",
601 cpg_group_name, event_counter, peer_name(peer),
602 cpg_peer->nodeid, cpg_peer->pid,
603 cpgreason2str(cpg_peer->reason));
608 }
else if (cpg_peer->nodeid == local_nodeid) {
609 crm_warn(
"Group %s event %d: duplicate local pid %u left%s",
610 cpg_group_name, event_counter,
611 cpg_peer->pid, cpgreason2str(cpg_peer->reason));
614 "%s (node %u) duplicate pid %u left%s (%u remains)",
615 cpg_group_name, event_counter, peer_name(peer),
616 cpg_peer->nodeid, cpg_peer->pid,
617 cpgreason2str(cpg_peer->reason), (*rival)->pid);
639 const struct cpg_name *group_name,
640 const struct cpg_address *member_list,
641 size_t member_list_entries,
642 const struct cpg_address *left_list,
643 size_t left_list_entries,
644 const struct cpg_address *joined_list,
645 size_t joined_list_entries)
647 static int counter = 0;
651 const struct cpg_address **sorted = NULL;
654 sizeof(
const struct cpg_address *));
656 for (
size_t iter = 0; iter < member_list_entries; iter++) {
657 sorted[iter] = member_list + iter;
661 qsort(sorted, member_list_entries,
sizeof(
const struct cpg_address *),
662 cmp_member_list_nodeid);
664 for (
int i = 0; i < left_list_entries; i++) {
665 node_left(group_name->value, counter, local_nodeid, &left_list[i],
666 sorted, member_list_entries);
671 for (
int i = 0; i < joined_list_entries; i++) {
672 crm_info(
"Group %s event %d: node %u pid %u joined%s",
673 group_name->value, counter, joined_list[i].nodeid,
674 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
677 for (
int i = 0; i < member_list_entries; i++) {
682 if (member_list[i].nodeid == local_nodeid
683 && member_list[i].
pid != getpid()) {
685 crm_warn(
"Group %s event %d: detected duplicate local pid %u",
686 group_name->value, counter, member_list[i].pid);
689 crm_info(
"Group %s event %d: %s (node %u pid %u) is member",
690 group_name->value, counter, peer_name(peer),
691 member_list[i].nodeid, member_list[i].
pid);
707 time_t now = time(NULL);
713 }
else if (now > (peer->
when_lost + 60)) {
715 crm_warn(
"Node %u is member of group %s but was believed "
717 member_list[i].nodeid, group_name->value);
722 if (local_nodeid == member_list[i].nodeid) {
728 crm_err(
"Local node was evicted from group %s", group_name->value);
746 if (cluster == NULL) {
749 cluster->cpg.cpg_deliver_fn = fn;
764 if (cluster == NULL) {
767 cluster->cpg.cpg_confchg_fn = fn;
786 cpg_handle_t handle = 0;
787 const char *cpg_group_name = NULL;
794 .dispatch = pcmk_cpg_dispatch,
798 cpg_model_v1_data_t cpg_model_info = {
799 .model = CPG_MODEL_V1,
800 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
801 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
802 .cpg_totem_confchg_fn = NULL,
812 if (cpg_group_name == NULL) {
819 memset(cluster->
priv->group.value, 0, 128);
820 strncpy(cluster->
priv->group.value, cpg_group_name, 127);
821 cluster->
priv->group.length = strlen(cluster->
priv->group.value) + 1;
823 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
825 crm_err(
"Could not connect to the CPG API: %s (%d)",
826 cs_strerror(rc), rc);
830 rc = cpg_fd_get(handle, &fd);
832 crm_err(
"Could not obtain the CPG API connection: %s (%d)",
833 cs_strerror(rc), rc);
839 &found_uid, &found_gid))) {
840 crm_err(
"CPG provider is not authentic:"
841 " process %lld (uid: %lld, gid: %lld)",
843 (
long long) found_uid, (
long long) found_gid);
847 crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
855 crm_err(
"Could not get local node id from the CPG API");
862 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->
priv->group));
864 crm_err(
"Could not join the CPG group '%s': %d", cpg_group_name, rc);
868 pcmk_cpg_handle = handle;
869 cluster->
priv->cpg_handle = handle;
874 cpg_finalize(handle);
894 if (cluster->
priv->cpg_handle != 0) {
896 cpg_leave(cluster->
priv->cpg_handle, &cluster->
priv->group);
897 cpg_finalize(cluster->
priv->cpg_handle);
898 cluster->
priv->cpg_handle = 0;
919 static int msg_id = 0;
920 static int local_pid = 0;
921 static int local_name_len = 0;
922 static const char *local_name = NULL;
928 if (local_name == NULL) {
931 if ((local_name_len == 0) && (local_name != NULL)) {
932 local_name_len = strlen(local_name);
939 if (local_pid == 0) {
940 local_pid = getpid();
947 msg->header.error = CS_OK;
949 msg->host.type = dest;
952 if (node->
name != NULL) {
954 msg->host.size = strlen(node->
name);
955 memset(msg->host.uname, 0,
MAX_NAME);
956 memcpy(msg->host.uname, node->
name, msg->host.size);
969 msg->sender.pid = local_pid;
970 msg->sender.size = local_name_len;
971 memset(msg->sender.uname, 0,
MAX_NAME);
973 if ((local_name != NULL) && (msg->sender.size != 0)) {
974 memcpy(msg->sender.uname, local_name, msg->sender.size);
977 msg->size = 1 + strlen(
data);
981 msg = pcmk__realloc(msg, msg->header.size);
982 memcpy(msg->data,
data, msg->size);
985 char *compressed = NULL;
986 unsigned int new_size = 0;
992 msg = pcmk__realloc(msg, msg->header.size);
993 memcpy(msg->data, compressed, new_size);
995 msg->is_compressed = TRUE;
996 msg->compressed_size = new_size;
999 msg = pcmk__realloc(msg, msg->header.size);
1000 memcpy(msg->data,
data, msg->size);
1007 iov->iov_base = msg;
1008 iov->iov_len = msg->header.size;
1010 if (msg->compressed_size > 0) {
1011 crm_trace(
"Queueing CPG message %" PRIu32
" to %s "
1012 "(%zu bytes, %" PRIu32
" bytes compressed payload): %.200s",
1013 msg->id,
target, iov->iov_len, msg->compressed_size,
data);
1015 crm_trace(
"Queueing CPG message %" PRIu32
" to %s "
1016 "(%zu bytes, %" PRIu32
" bytes payload): %.200s",
1017 msg->id,
target, iov->iov_len, msg->size,
data);
1022 cs_message_queue = g_list_append(cs_message_queue, iov);
1023 crm_cs_flush(&pcmk_cpg_handle);
1043 GString *
data = g_string_sized_new(1024);
1047 rc = send_cpg_text(
data->str, node, dest);
1048 g_string_free(
data, TRUE);
@ pcmk__node_search_cluster_member
Search for cluster nodes from membership cache.
pcmk__node_status_t * pcmk__update_peer_state(const char *source, pcmk__node_status_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
const char * pcmk__cluster_local_node_name(void)
pcmk__node_status_t * pcmk__get_node(unsigned int id, const char *uname, const char *xml_id, uint32_t flags)
pcmk__node_status_t * crm_update_peer_proc(const char *source, pcmk__node_status_t *peer, uint32_t flag, const char *status)
pcmk__node_status_t * pcmk__search_node_caches(unsigned int id, const char *uname, const char *xml_id, uint32_t flags)
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
#define pcmk__assert_alloc(nmemb, size)
struct tcp_async_cb_data __attribute__
void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
char * pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, const char **from)
bool pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node, enum pcmk_ipc_server dest)
int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
Set the CPG deliver callback function for a cluster object.
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
int pcmk__cpg_connect(pcmk_cluster_t *cluster)
Connect to Corosync CPG.
int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
Set the CPG config change callback function for a cluster object.
#define msg_data_len(msg)
void pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
uint32_t pcmk__cpg_local_nodeid(cpg_handle_t handle)
#define cs_repeat(rc, counter, max, code)
enum pcmk_ipc_server type
struct pcmk__cpg_host_s pcmk__cpg_host_t
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
IPC interface to Pacemaker daemons.
pcmk_ipc_server
Available IPC interfaces.
@ pcmk_ipc_unknown
Unknown or invalid.
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
#define PCMK__SPECIAL_PID_AS_0(p)
#define crm_info(fmt, args...)
#define do_crm_log(level, fmt, args...)
Log a message.
#define crm_warn(fmt, args...)
#define crm_debug(fmt, args...)
#define crm_err(fmt, args...)
#define crm_trace(fmt, args...)
Wrappers for and extensions to glib mainloop.
#define G_PRIORITY_MEDIUM
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
#define PCMK_VALUE_OFFLINE
#define PCMK_VALUE_MEMBER
#define PCMK_VALUE_ONLINE
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
enum pcmk_ipc_server pcmk__parse_server(const char *text)
const char * pcmk__server_message_type(enum pcmk_ipc_server server)
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define pcmk__plural_s(i)
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
#define pcmk__str_copy(str)
uint32_t node_id
Local node ID at cluster layer.
enum pcmk_ipc_server server
Server this connection is for (if any)
pcmk__cluster_private_t * priv
void(* destroy)(gpointer)
Node status data (may be a cluster node or a Pacemaker Remote node)
uint32_t cluster_layer_id
Cluster-layer numeric node ID.
char * name
Node name as known to cluster layer, or Pacemaker Remote node name.
time_t when_lost
When CPG membership was last lost.
Wrappers for and extensions to libxml2.
#define CRM_BZ2_THRESHOLD
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)