12 #include <sys/socket.h>    13 #include <netinet/in.h>    14 #include <arpa/inet.h>    20 #include <sys/utsname.h>    22 #include <qb/qbipc_common.h>    23 #include <qb/qbipcc.h>    24 #include <qb/qbutil.h>    26 #include <corosync/corodefs.h>    27 #include <corosync/corotypes.h>    28 #include <corosync/hdb.h>    29 #include <corosync/cpg.h>    39 static cpg_handle_t pcmk_cpg_handle = 0;
    42 static bool cpg_evicted = 
false;
    43 static GList *cs_message_queue = NULL;
    44 static int cs_message_timer = 0;
    46 struct pcmk__cpg_host_s {
    57 struct pcmk__cpg_msg_s {
    58     struct qb_ipc_response_header header 
__attribute__ ((aligned(8)));
    74 static void crm_cs_flush(gpointer 
data);
    76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)    78 #define cs_repeat(rc, counter, max, code) do {                          \    80         if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) {    \    82             crm_debug("Retrying operation after %ds", counter);         \    87     } while (counter < max)    98     if (cluster->cpg_handle) {
   100         cpg_leave(cluster->cpg_handle, &cluster->group);
   101         cpg_finalize(cluster->cpg_handle);
   102         cluster->cpg_handle = 0;
   119     cs_error_t rc = CS_OK;
   121     static uint32_t local_nodeid = 0;
   122     cpg_handle_t local_handle = handle;
   123     cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
   130     if(local_nodeid != 0) {
   136         cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
   138             crm_err(
"Could not connect to the CPG API: %s (%d)",
   139                     cs_strerror(rc), rc);
   143         rc = cpg_fd_get(local_handle, &fd);
   145             crm_err(
"Could not obtain the CPG API connection: %s (%d)",
   146                     cs_strerror(rc), rc);
   152                                                 &found_uid, &found_gid))) {
   153             crm_err(
"CPG provider is not authentic:"   154                     " process %lld (uid: %lld, gid: %lld)",
   156                     (
long long) found_uid, (
long long) found_gid);
   159             crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
   168         cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
   172         crm_err(
"Could not get local node id from the CPG API: %s (%d)",
   173                 pcmk__cs_err_str(rc), rc);
   179         cpg_finalize(local_handle);
   181     crm_debug(
"Local nodeid is %u", local_nodeid);
   194 crm_cs_flush_cb(gpointer 
data)
   196     cs_message_timer = 0;
   202 #define CS_SEND_MAX 200   211 crm_cs_flush(gpointer 
data)
   213     unsigned int sent = 0;
   216     cpg_handle_t *handle = (cpg_handle_t *) 
data;
   223     queue_len = g_list_length(cs_message_queue);
   224     if (((queue_len % 1000) == 0) && (queue_len > 1)) {
   225         crm_err(
"CPG queue has grown to %d", queue_len);
   228         crm_warn(
"CPG queue has grown to %d", queue_len);
   231     if (cs_message_timer != 0) {
   233         crm_trace(
"Timer active %d", cs_message_timer);
   237     while ((cs_message_queue != NULL) && (sent < 
CS_SEND_MAX)) {
   238         struct iovec *iov = cs_message_queue->data;
   240         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
   247                   (
unsigned long long) iov->iov_len);
   249         cs_message_queue = g_list_remove(cs_message_queue, iov);
   256                "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
   260     if (cs_message_queue) {
   261         uint32_t delay_ms = 100;
   264             delay_ms = QB_MIN(1000, 
CS_SEND_MAX + (10 * queue_len));
   266         cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, 
data);
   279 pcmk_cpg_dispatch(gpointer user_data)
   281     cs_error_t rc = CS_OK;
   284     rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
   286         crm_err(
"Connection to the CPG API failed: %s (%d)",
   287                 pcmk__cs_err_str(rc), rc);
   288         cpg_finalize(cluster->cpg_handle);
   289         cluster->cpg_handle = 0;
   292     } 
else if (cpg_evicted) {
   293         crm_err(
"Evicted from CPG membership");
   299 static inline const char *
   304     } 
else if (
host->size > 0) {
   311 static inline const char *
   314     const char *text = 
"unknown";
   364     if (payload_size < 1) {
   365         crm_err(
"%sCPG message %d from %s invalid: "   366                 "Claimed size of %d bytes is too small "   367                 CRM_XS " from %s[%u] to %s@%s",
   368                 (msg->is_compressed? 
"Compressed " : 
""),
   369                 msg->id, ais_dest(&(msg->sender)),
   370                 (
int) msg->header.size,
   371                 msg_type2text(msg->sender.type), msg->sender.pid,
   372                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   376     if (msg->header.error != CS_OK) {
   377         crm_err(
"%sCPG message %d from %s invalid: "   378                 "Sender indicated error %d "   379                 CRM_XS " from %s[%u] to %s@%s",
   380                 (msg->is_compressed? 
"Compressed " : 
""),
   381                 msg->id, ais_dest(&(msg->sender)),
   383                 msg_type2text(msg->sender.type), msg->sender.pid,
   384                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   389         crm_err(
"%sCPG message %d from %s invalid: "   390                 "Total size %d inconsistent with payload size %d "   391                 CRM_XS " from %s[%u] to %s@%s",
   392                 (msg->is_compressed? 
"Compressed " : 
""),
   393                 msg->id, ais_dest(&(msg->sender)),
   395                 msg_type2text(msg->sender.type), msg->sender.pid,
   396                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   400     if (!msg->is_compressed &&
   404         (((msg->size > 1) && (msg->data[msg->size - 2] == 
'\0'))
   405          || (msg->data[msg->size - 1] != 
'\0'))) {
   406         crm_err(
"CPG message %d from %s invalid: "   407                 "Payload does not end at byte %llu "   408                 CRM_XS " from %s[%u] to %s@%s",
   409                 msg->id, ais_dest(&(msg->sender)),
   410                 (
unsigned long long) msg->size,
   411                 msg_type2text(msg->sender.type), msg->sender.pid,
   412                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   416     crm_trace(
"Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
   417               (
int) msg->header.size, (msg->is_compressed? 
"compressed " : 
""),
   418               msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
   419               ais_dest(&(msg->sender)),
   420               msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   442                         uint32_t *kind, 
const char **from)
   452         if (msg->sender.id > 0 && msg->sender.id != nodeid) {
   453             crm_err(
"Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, 
pid, msg->sender.id);
   456         } 
else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
   458             crm_trace(
"Not for us: %u != %u", msg->host.id, local_nodeid);
   460         } 
else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, 
pcmk__str_casei)) {
   462             crm_trace(
"Not for us: %s != %s", msg->host.uname, local_name);
   466         msg->sender.id = nodeid;
   467         if (msg->sender.size == 0) {
   471                 crm_err(
"Peer with nodeid=%u is unknown", nodeid);
   473             } 
else if (peer->
uname == NULL) {
   474                 crm_err(
"No uname for peer with nodeid=%u", nodeid);
   477                 crm_notice(
"Fixing uname for peer with nodeid=%u", nodeid);
   478                 msg->sender.size = strlen(peer->
uname);
   479                 memset(msg->sender.uname, 0, 
MAX_NAME);
   480                 memcpy(msg->sender.uname, peer->
uname, msg->sender.size);
   485     crm_trace(
"Got new%s message (size=%d, %d, %d)",
   486               msg->is_compressed ? 
" compressed" : 
"",
   490         *kind = msg->header.id;
   493         *from = msg->sender.uname;
   496     if (msg->is_compressed && msg->size > 0) {
   498         char *uncompressed = NULL;
   499         unsigned int new_size = msg->size + 1;
   501         if (!check_message_sanity(msg)) {
   506         uncompressed = calloc(1, new_size);
   507         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
   521     } 
else if (!check_message_sanity(msg)) {
   525         data = strdup(msg->data);
   535     crm_err(
"Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"   536             " min=%d, total=%d, size=%d, bz2_size=%d",
   537             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
   538             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
   540             msg->header.size, msg->size, msg->compressed_size);
   558 cmp_member_list_nodeid(
const void *first, 
const void *second)
   560     const struct cpg_address *
const a = *((
const struct cpg_address **) first),
   561                              *
const b = *((
const struct cpg_address **) second);
   562     if (a->nodeid < b->nodeid) {
   564     } 
else if (a->nodeid > b->nodeid) {
   580 cpgreason2str(cpg_reason_t reason)
   583         case CPG_REASON_JOIN:       
return " via cpg_join";
   584         case CPG_REASON_LEAVE:      
return " via cpg_leave";
   585         case CPG_REASON_NODEDOWN:   
return " via cluster exit";
   586         case CPG_REASON_NODEUP:     
return " via cluster join";
   587         case CPG_REASON_PROCDOWN:   
return " for unknown reason";
   601 static inline const char *
   605         return "unknown node";
   606     } 
else if (peer->
uname == NULL) {
   625 node_left(
const char *cpg_group_name, 
int event_counter,
   626           uint32_t local_nodeid, 
const struct cpg_address *cpg_peer,
   627           const struct cpg_address **sorted_member_list,
   628           size_t member_list_entries)
   632     const struct cpg_address **rival = NULL;
   647         rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
   648                         sizeof(
const struct cpg_address *),
   649                         cmp_member_list_nodeid);
   653         crm_info(
"Group %s event %d: %s (node %u pid %u) left%s",
   654                  cpg_group_name, event_counter, peer_name(peer),
   655                  cpg_peer->nodeid, cpg_peer->pid,
   656                  cpgreason2str(cpg_peer->reason));
   661     } 
else if (cpg_peer->nodeid == local_nodeid) {
   662         crm_warn(
"Group %s event %d: duplicate local pid %u left%s",
   663                  cpg_group_name, event_counter,
   664                  cpg_peer->pid, cpgreason2str(cpg_peer->reason));
   667                  "%s (node %u) duplicate pid %u left%s (%u remains)",
   668                  cpg_group_name, event_counter, peer_name(peer),
   669                  cpg_peer->nodeid, cpg_peer->pid,
   670                  cpgreason2str(cpg_peer->reason), (*rival)->pid);
   688                     const struct cpg_name *groupName,
   689                     const struct cpg_address *member_list, 
size_t member_list_entries,
   690                     const struct cpg_address *left_list, 
size_t left_list_entries,
   691                     const struct cpg_address *joined_list, 
size_t joined_list_entries)
   694     gboolean found = FALSE;
   695     static int counter = 0;
   697     const struct cpg_address **sorted;
   699     sorted = malloc(member_list_entries * 
sizeof(
const struct cpg_address *));
   702     for (
size_t iter = 0; iter < member_list_entries; iter++) {
   703         sorted[iter] = member_list + iter;
   706     qsort(sorted, member_list_entries, 
sizeof(
const struct cpg_address *),
   707           cmp_member_list_nodeid);
   709     for (i = 0; i < left_list_entries; i++) {
   710         node_left(groupName->value, counter, local_nodeid, &left_list[i],
   711                   sorted, member_list_entries);
   716     for (i = 0; i < joined_list_entries; i++) {
   717         crm_info(
"Group %s event %d: node %u pid %u joined%s",
   718                  groupName->value, counter, joined_list[i].nodeid,
   719                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
   722     for (i = 0; i < member_list_entries; i++) {
   725         if (member_list[i].nodeid == local_nodeid
   726                 && member_list[i].
pid != getpid()) {
   728             crm_warn(
"Group %s event %d: detected duplicate local pid %u",
   729                      groupName->value, counter, member_list[i].pid);
   732         crm_info(
"Group %s event %d: %s (node %u pid %u) is member",
   733                  groupName->value, counter, peer_name(peer),
   734                  member_list[i].nodeid, member_list[i].
pid);
   750             time_t now = time(NULL);
   756             } 
else if (now > (peer->
when_lost + 60)) {
   758                 crm_warn(
"Node %u is member of group %s but was believed offline",
   759                          member_list[i].nodeid, groupName->value);
   764         if (local_nodeid == member_list[i].nodeid) {
   770         crm_err(
"Local node was evicted from group %s", groupName->value);
   792     cpg_handle_t handle = 0;
   804     cpg_model_v1_data_t cpg_model_info = {
   805         .model = CPG_MODEL_V1,
   806         .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
   807         .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
   808         .cpg_totem_confchg_fn = NULL,
   813     cluster->group.length = 0;
   814     cluster->group.value[0] = 0;
   817     strncpy(cluster->group.value, message_name, 127);
   818     cluster->group.value[127] = 0;
   819     cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
   821     cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
   823         crm_err(
"Could not connect to the CPG API: %s (%d)",
   824                 cs_strerror(rc), rc);
   828     rc = cpg_fd_get(handle, &fd);
   830         crm_err(
"Could not obtain the CPG API connection: %s (%d)",
   831                 cs_strerror(rc), rc);
   837                                             &found_uid, &found_gid))) {
   838         crm_err(
"CPG provider is not authentic:"   839                 " process %lld (uid: %lld, gid: %lld)",
   841                 (
long long) found_uid, (
long long) found_gid);
   845         crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
   853         crm_err(
"Could not get local node id from the CPG API");
   860     cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
   862         crm_err(
"Could not join the CPG group '%s': %d", message_name, rc);
   866     pcmk_cpg_handle = handle;
   867     cluster->cpg_handle = handle;
   872         cpg_finalize(handle);
   919     static int msg_id = 0;
   920     static int local_pid = 0;
   921     static int local_name_len = 0;
   922     static const char *local_name = NULL;
   933             crm_err(
"Invalid message class: %d", msg_class);
   939     if (local_name == NULL) {
   942     if ((local_name_len == 0) && (local_name != NULL)) {
   943         local_name_len = strlen(local_name);
   950     if (local_pid == 0) {
   951         local_pid = getpid();
   962     msg->header.id = msg_class;
   963     msg->header.error = CS_OK;
   965     msg->host.type = dest;
   966     msg->host.local = 
local;
   971             msg->host.size = strlen(node->
uname);
   972             memset(msg->host.uname, 0, 
MAX_NAME);
   973             memcpy(msg->host.uname, node->
uname, msg->host.size);
   977         msg->host.id = node->
id;
   983     msg->sender.type = 
sender;
   984     msg->sender.pid = local_pid;
   985     msg->sender.size = local_name_len;
   986     memset(msg->sender.uname, 0, 
MAX_NAME);
   987     if ((local_name != NULL) && (msg->sender.size != 0)) {
   988         memcpy(msg->sender.uname, local_name, msg->sender.size);
   991     msg->size = 1 + strlen(
data);
   995         msg = pcmk__realloc(msg, msg->header.size);
   996         memcpy(msg->data, 
data, msg->size);
   999         char *compressed = NULL;
  1000         unsigned int new_size = 0;
  1001         char *uncompressed = strdup(
data);
  1007             msg = pcmk__realloc(msg, msg->header.size);
  1008             memcpy(msg->data, compressed, new_size);
  1010             msg->is_compressed = TRUE;
  1011             msg->compressed_size = new_size;
  1016             msg = pcmk__realloc(msg, msg->header.size);
  1017             memcpy(msg->data, 
data, msg->size);
  1024     iov = calloc(1, 
sizeof(
struct iovec));
  1025     iov->iov_base = msg;
  1026     iov->iov_len = msg->header.size;
  1028     if (msg->compressed_size) {
  1029         crm_trace(
"Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
  1030                   msg->id, 
target, (
unsigned long long) iov->iov_len,
  1031                   msg->compressed_size, 
data);
  1033         crm_trace(
"Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
  1034                   msg->id, 
target, (
unsigned long long) iov->iov_len,
  1039     cs_message_queue = g_list_append(cs_message_queue, iov);
  1040     crm_cs_flush(&pcmk_cpg_handle);
  1082         int scan_rc = sscanf(text, 
"%d", &
type);
 
#define CRM_CHECK(expr, failure_action)
 
#define cs_repeat(rc, counter, max, code)
 
#define crm_notice(fmt, args...)
 
const char * bz2_strerror(int rc)
 
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
 
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
 
const char * get_local_node_name(void)
Get the local node's name. 
 
void(* destroy)(gpointer)
 
#define PCMK__SPECIAL_PID_AS_0(p)
 
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
 
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Get a cluster node cache entry. 
 
enum crm_ais_msg_types type
 
char * strerror(int errnum)
 
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
 
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Extract text data from a Corosync CPG message. 
 
Wrappers for and extensions to glib mainloop. 
 
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Disconnect from Corosync CPG. 
 
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready. 
 
struct pcmk__cpg_host_s pcmk__cpg_host_t
 
#define crm_warn(fmt, args...)
 
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node's state and membership information. 
 
#define crm_debug(fmt, args...)
 
#define crm_trace(fmt, args...)
 
crm_node_t * pcmk__search_cluster_node_cache(unsigned int id, const char *uname)
 
#define do_crm_log(level, fmt, args...)
Log a message. 
 
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
 
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
 
#define CRM_SYSTEM_PENGINE
 
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
 
gboolean pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
 
struct pcmk__cpg_host_s __attribute__((packed))
 
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes) 
 
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages. 
 
#define CRM_SYSTEM_STONITHD
 
#define CRM_SYSTEM_TENGINE
 
uint32_t get_local_nodeid(cpg_handle_t handle)
Get the local Corosync node ID (via CPG) 
 
#define crm_err(fmt, args...)
 
#define G_PRIORITY_MEDIUM
 
enum crm_ais_msg_types text2msg_type(const char *text)
Get the message type equivalent of a string. 
 
#define CRM_BZ2_THRESHOLD
 
char * dump_xml_unformatted(xmlNode *msg)
 
#define msg_data_len(msg)
 
#define pcmk__plural_s(i)
 
IPC interface to Pacemaker daemons. 
 
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Handle a CPG configuration change event. 
 
#define crm_info(fmt, args...)
 
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Connect to Corosync CPG. 
 
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)