12 #include <arpa/inet.h>    15 #include <netinet/in.h>    18 #include <sys/socket.h>    19 #include <sys/types.h>                      20 #include <sys/utsname.h>    23 #include <corosync/corodefs.h>    24 #include <corosync/corotypes.h>    25 #include <corosync/hdb.h>    26 #include <corosync/cpg.h>    27 #include <qb/qbipc_common.h>    28 #include <qb/qbipcc.h>    29 #include <qb/qbutil.h>    42 static cpg_handle_t pcmk_cpg_handle = 0;
    45 static bool cpg_evicted = 
false;
    46 static GList *cs_message_queue = NULL;
    47 static int cs_message_timer = 0;
    54 struct pcmk__cpg_host_s {
    65 struct pcmk__cpg_msg_s {
    66     struct qb_ipc_response_header header 
__attribute__ ((aligned(8)));
    82 static void crm_cs_flush(gpointer 
data);
    84 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)    86 #define cs_repeat(rc, counter, max, code) do {                          \    88         if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) {    \    90             crm_debug("Retrying operation after %ds", counter);         \    95     } while (counter < max)   108     cs_error_t rc = CS_OK;
   110     static uint32_t local_nodeid = 0;
   111     cpg_handle_t local_handle = handle;
   112     cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
   119     if (local_nodeid != 0) {
   126                   cpg_model_initialize(&local_handle, CPG_MODEL_V1,
   127                                        (cpg_model_data_t *) &cpg_model_info,
   130             crm_err(
"Could not connect to the CPG API: %s (%d)",
   131                     cs_strerror(rc), rc);
   135         rc = cpg_fd_get(local_handle, &fd);
   137             crm_err(
"Could not obtain the CPG API connection: %s (%d)",
   138                     cs_strerror(rc), rc);
   144                                           &found_uid, &found_gid);
   146             crm_err(
"CPG provider is not authentic:"   147                     " process %lld (uid: %lld, gid: %lld)",
   149                     (
long long) found_uid, (
long long) found_gid);
   153             crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
   162         cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
   166         crm_err(
"Could not get local node id from the CPG API: %s (%d)",
   167                 pcmk__cs_err_str(rc), rc);
   173         cpg_finalize(local_handle);
   175     crm_debug(
"Local nodeid is %u", local_nodeid);
   188 crm_cs_flush_cb(gpointer 
data)
   190     cs_message_timer = 0;
   196 #define CS_SEND_MAX 200   205 crm_cs_flush(gpointer 
data)
   207     unsigned int sent = 0;
   210     cpg_handle_t *handle = (cpg_handle_t *) 
data;
   217     queue_len = g_list_length(cs_message_queue);
   218     if (((queue_len % 1000) == 0) && (queue_len > 1)) {
   219         crm_err(
"CPG queue has grown to %d", queue_len);
   222         crm_warn(
"CPG queue has grown to %d", queue_len);
   225     if (cs_message_timer != 0) {
   227         crm_trace(
"Timer active %d", cs_message_timer);
   231     while ((cs_message_queue != NULL) && (sent < 
CS_SEND_MAX)) {
   232         struct iovec *iov = cs_message_queue->data;
   234         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
   241                   (
unsigned long long) iov->iov_len);
   243         cs_message_queue = g_list_remove(cs_message_queue, iov);
   250                "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
   254     if (cs_message_queue) {
   255         uint32_t delay_ms = 100;
   258             delay_ms = QB_MIN(1000, 
CS_SEND_MAX + (10 * queue_len));
   273 pcmk_cpg_dispatch(gpointer user_data)
   275     cs_error_t rc = CS_OK;
   278     rc = cpg_dispatch(cluster->
priv->cpg_handle, CS_DISPATCH_ONE);
   280         crm_err(
"Connection to the CPG API failed: %s (%d)",
   281                 pcmk__cs_err_str(rc), rc);
   282         cpg_finalize(cluster->
priv->cpg_handle);
   283         cluster->
priv->cpg_handle = 0;
   286     } 
else if (cpg_evicted) {
   287         crm_err(
"Evicted from CPG membership");
   293 static inline const char *
   296     return (
host->size > 0)? 
host->uname : 
"<all>";
   299 static inline const char *
   304     return pcmk__s(
name, 
"unknown");
   320     if (payload_size < 1) {
   321         crm_err(
"%sCPG message %d from %s invalid: "   322                 "Claimed size of %d bytes is too small "   323                 QB_XS 
" from %s[%u] to %s@%s",
   324                 (msg->is_compressed? 
"Compressed " : 
""),
   325                 msg->id, ais_dest(&(msg->sender)),
   326                 (
int) msg->header.size,
   327                 msg_type2text(msg->sender.type), msg->sender.pid,
   328                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   332     if (msg->header.error != CS_OK) {
   333         crm_err(
"%sCPG message %d from %s invalid: "   334                 "Sender indicated error %d "   335                 QB_XS 
" from %s[%u] to %s@%s",
   336                 (msg->is_compressed? 
"Compressed " : 
""),
   337                 msg->id, ais_dest(&(msg->sender)),
   339                 msg_type2text(msg->sender.type), msg->sender.pid,
   340                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   345         crm_err(
"%sCPG message %d from %s invalid: "   346                 "Total size %d inconsistent with payload size %d "   347                 QB_XS 
" from %s[%u] to %s@%s",
   348                 (msg->is_compressed? 
"Compressed " : 
""),
   349                 msg->id, ais_dest(&(msg->sender)),
   351                 msg_type2text(msg->sender.type), msg->sender.pid,
   352                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   356     if (!msg->is_compressed &&
   360         (((msg->size > 1) && (msg->data[msg->size - 2] == 
'\0'))
   361          || (msg->data[msg->size - 1] != 
'\0'))) {
   362         crm_err(
"CPG message %d from %s invalid: "   363                 "Payload does not end at byte %llu "   364                 QB_XS 
" from %s[%u] to %s@%s",
   365                 msg->id, ais_dest(&(msg->sender)),
   366                 (
unsigned long long) msg->size,
   367                 msg_type2text(msg->sender.type), msg->sender.pid,
   368                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   372     crm_trace(
"Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
   373               (
int) msg->header.size, (msg->is_compressed? 
"compressed " : 
""),
   374               msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
   375               ais_dest(&(msg->sender)),
   376               msg_type2text(msg->host.type), ais_dest(&(msg->host)));
   398                        void *content, 
const char **from)
   412         if (msg->sender.id == 0) {
   413             msg->sender.id = sender_id;
   414         } 
else if (msg->sender.id != sender_id) {
   415             crm_warn(
"Ignoring CPG message from ID %" PRIu32 
" PID %" PRIu32
   416                      ": claimed ID %" PRIu32,
   417                     sender_id, 
pid, msg->sender.id);
   422         if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
   423             crm_trace(
"Ignoring CPG message from ID %" PRIu32 
" PID %" PRIu32
   424                       ": for ID %" PRIu32 
" not %" PRIu32,
   425                       sender_id, 
pid, msg->host.id, local_nodeid);
   428         if ((msg->host.size > 0)
   431             crm_trace(
"Ignoring CPG message from ID %" PRIu32 
" PID %" PRIu32
   432                       ": for name %s not %s",
   433                       sender_id, 
pid, msg->host.uname, local_name);
   438         if (msg->sender.size == 0) {
   443             if (peer->
name == NULL) {
   444                 crm_debug(
"Received CPG message from node with ID %" PRIu32
   445                           " but its name is unknown", sender_id);
   447                 crm_debug(
"Updating name of CPG message sender with ID %" PRIu32
   448                           " to %s", sender_id, peer->
name);
   449                 msg->sender.size = strlen(peer->
name);
   450                 memset(msg->sender.uname, 0, 
MAX_NAME);
   451                 memcpy(msg->sender.uname, peer->
name, msg->sender.size);
   461         *from = msg->sender.uname;
   464     if (!check_message_sanity(msg)) {
   468     if (msg->is_compressed && (msg->size > 0)) {
   470         unsigned int new_size = msg->size + 1;
   473         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
   474                                         msg->compressed_size, 1, 0);
   476         if ((rc == 
pcmk_rc_ok) && (msg->size != new_size)) { 
   481             crm_warn(
"Ignoring compressed CPG message %d from %s (ID %" PRIu32
   482                     " PID %" PRIu32 
"): %s",
   483                      msg->id, ais_dest(&(msg->sender)), sender_id, 
pid,
   493     crm_trace(
"Received %sCPG message %d from %s (ID %" PRIu32
   494               " PID %" PRIu32 
"): %.40s...",
   495               (msg->is_compressed? 
"compressed " : 
""),
   496               msg->id, ais_dest(&(msg->sender)), sender_id, 
pid, msg->data);
   512 cmp_member_list_nodeid(
const void *first, 
const void *second)
   514     const struct cpg_address *
const a = *((
const struct cpg_address **) first),
   515                              *
const b = *((
const struct cpg_address **) second);
   516     if (a->nodeid < b->nodeid) {
   518     } 
else if (a->nodeid > b->nodeid) {
   534 cpgreason2str(cpg_reason_t reason)
   537         case CPG_REASON_JOIN:       
return " via cpg_join";
   538         case CPG_REASON_LEAVE:      
return " via cpg_leave";
   539         case CPG_REASON_NODEDOWN:   
return " via cluster exit";
   540         case CPG_REASON_NODEUP:     
return " via cluster join";
   541         case CPG_REASON_PROCDOWN:   
return " for unknown reason";
   555 static inline const char *
   558     return (peer != NULL)? pcmk__s(peer->
name, 
"peer node") : 
"unknown node";
   573 node_left(
const char *cpg_group_name, 
int event_counter,
   574           uint32_t local_nodeid, 
const struct cpg_address *cpg_peer,
   575           const struct cpg_address **sorted_member_list,
   576           size_t member_list_entries)
   581     const struct cpg_address **rival = NULL;
   596         rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
   597                         sizeof(
const struct cpg_address *),
   598                         cmp_member_list_nodeid);
   602         crm_info(
"Group %s event %d: %s (node %u pid %u) left%s",
   603                  cpg_group_name, event_counter, peer_name(peer),
   604                  cpg_peer->nodeid, cpg_peer->pid,
   605                  cpgreason2str(cpg_peer->reason));
   610     } 
else if (cpg_peer->nodeid == local_nodeid) {
   611         crm_warn(
"Group %s event %d: duplicate local pid %u left%s",
   612                  cpg_group_name, event_counter,
   613                  cpg_peer->pid, cpgreason2str(cpg_peer->reason));
   616                  "%s (node %u) duplicate pid %u left%s (%u remains)",
   617                  cpg_group_name, event_counter, peer_name(peer),
   618                  cpg_peer->nodeid, cpg_peer->pid,
   619                  cpgreason2str(cpg_peer->reason), (*rival)->pid);
   641                      const struct cpg_name *group_name,
   642                      const struct cpg_address *member_list,
   643                      size_t member_list_entries,
   644                      const struct cpg_address *left_list,
   645                      size_t left_list_entries,
   646                      const struct cpg_address *joined_list,
   647                      size_t joined_list_entries)
   649     static int counter = 0;
   653     const struct cpg_address **sorted = NULL;
   656                                 sizeof(
const struct cpg_address *));
   658     for (
size_t iter = 0; iter < member_list_entries; iter++) {
   659         sorted[iter] = member_list + iter;
   663     qsort(sorted, member_list_entries, 
sizeof(
const struct cpg_address *),
   664           cmp_member_list_nodeid);
   666     for (
int i = 0; i < left_list_entries; i++) {
   667         node_left(group_name->value, counter, local_nodeid, &left_list[i],
   668                   sorted, member_list_entries);
   673     for (
int i = 0; i < joined_list_entries; i++) {
   674         crm_info(
"Group %s event %d: node %u pid %u joined%s",
   675                  group_name->value, counter, joined_list[i].nodeid,
   676                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
   679     for (
int i = 0; i < member_list_entries; i++) {
   684         if (member_list[i].nodeid == local_nodeid
   685                 && member_list[i].
pid != getpid()) {
   687             crm_warn(
"Group %s event %d: detected duplicate local pid %u",
   688                      group_name->value, counter, member_list[i].pid);
   691         crm_info(
"Group %s event %d: %s (node %u pid %u) is member",
   692                  group_name->value, counter, peer_name(peer),
   693                  member_list[i].nodeid, member_list[i].
pid);
   709             time_t now = time(NULL);
   715             } 
else if (now > (peer->
when_lost + 60)) {
   717                 crm_warn(
"Node %u is member of group %s but was believed "   719                          member_list[i].nodeid, group_name->value);
   724         if (local_nodeid == member_list[i].nodeid) {
   730         crm_err(
"Local node was evicted from group %s", group_name->value);
   748     if (cluster == NULL) {
   751     cluster->cpg.cpg_deliver_fn = fn;
   766     if (cluster == NULL) {
   769     cluster->cpg.cpg_confchg_fn = fn;
   788     cpg_handle_t handle = 0;
   789     const char *cpg_group_name = NULL;
   800     cpg_model_v1_data_t cpg_model_info = {
   801         .model = CPG_MODEL_V1,
   802         .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
   803         .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
   804         .cpg_totem_confchg_fn = NULL,
   811     if (cpg_group_name == NULL) {
   818     memset(cluster->
priv->group.value, 0, 128);
   819     strncpy(cluster->
priv->group.value, cpg_group_name, 127);
   820     cluster->
priv->group.length = strlen(cluster->
priv->group.value) + 1;
   822     cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
   824         crm_err(
"Could not connect to the CPG API: %s (%d)",
   825                 cs_strerror(rc), rc);
   829     rc = cpg_fd_get(handle, &fd);
   831         crm_err(
"Could not obtain the CPG API connection: %s (%d)",
   832                 cs_strerror(rc), rc);
   838                                             &found_uid, &found_gid))) {
   839         crm_err(
"CPG provider is not authentic:"   840                 " process %lld (uid: %lld, gid: %lld)",
   842                 (
long long) found_uid, (
long long) found_gid);
   846         crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
   854         crm_err(
"Could not get local node id from the CPG API");
   861     cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->
priv->group));
   863         crm_err(
"Could not join the CPG group '%s': %d", cpg_group_name, rc);
   867     pcmk_cpg_handle = handle;
   868     cluster->
priv->cpg_handle = handle;
   873         cpg_finalize(handle);
   893     if (cluster->
priv->cpg_handle != 0) {
   895         cpg_leave(cluster->
priv->cpg_handle, &cluster->
priv->group);
   896         cpg_finalize(cluster->
priv->cpg_handle);
   897         cluster->
priv->cpg_handle = 0;
   918     static int msg_id = 0;
   919     static int local_pid = 0;
   920     static int local_name_len = 0;
   921     static const char *local_name = NULL;
   927     if (local_name == NULL) {
   930     if ((local_name_len == 0) && (local_name != NULL)) {
   931         local_name_len = strlen(local_name);
   938     if (local_pid == 0) {
   939         local_pid = getpid();
   946     msg->header.error = CS_OK;
   948     msg->host.type = dest;
   951         if (node->
name != NULL) {
   953             msg->host.size = strlen(node->
name);
   954             memset(msg->host.uname, 0, 
MAX_NAME);
   955             memcpy(msg->host.uname, node->
name, msg->host.size);
   968     msg->sender.pid = local_pid;
   969     msg->sender.size = local_name_len;
   970     memset(msg->sender.uname, 0, 
MAX_NAME);
   972     if ((local_name != NULL) && (msg->sender.size != 0)) {
   973         memcpy(msg->sender.uname, local_name, msg->sender.size);
   976     msg->size = 1 + strlen(
data);
   980         msg = pcmk__realloc(msg, msg->header.size);
   981         memcpy(msg->data, 
data, msg->size);
   984         char *compressed = NULL;
   985         unsigned int new_size = 0;
   991             msg = pcmk__realloc(msg, msg->header.size);
   992             memcpy(msg->data, compressed, new_size);
   994             msg->is_compressed = TRUE;
   995             msg->compressed_size = new_size;
  1000             msg = pcmk__realloc(msg, msg->header.size);
  1001             memcpy(msg->data, 
data, msg->size);
  1008     iov->iov_base = msg;
  1009     iov->iov_len = msg->header.size;
  1011     if (msg->compressed_size > 0) {
  1012         crm_trace(
"Queueing CPG message %u to %s "  1013                   "(%llu bytes, %d bytes compressed payload): %.200s",
  1014                   msg->id, 
target, (
unsigned long long) iov->iov_len,
  1015                   msg->compressed_size, 
data);
  1017         crm_trace(
"Queueing CPG message %u to %s "  1018                   "(%llu bytes, %d bytes payload): %.200s",
  1019                   msg->id, 
target, (
unsigned long long) iov->iov_len,
  1025     cs_message_queue = g_list_append(cs_message_queue, iov);
  1026     crm_cs_flush(&pcmk_cpg_handle);
  1046     GString *
data = g_string_sized_new(1024);
  1050     rc = send_cpg_text(
data->str, node, dest);
  1051     g_string_free(
data, TRUE);
 
pcmk__node_status_t * pcmk__get_node(unsigned int id, const char *uname, const char *xml_id, uint32_t flags)
 
#define cs_repeat(rc, counter, max, code)
 
uint32_t pcmk__cpg_local_nodeid(cpg_handle_t handle)
 
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
 
#define CRM_BZ2_THRESHOLD
 
enum pcmk_ipc_server type
 
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
 
Search for cluster nodes from membership cache. 
 
#define PCMK__SPECIAL_PID_AS_0(p)
 
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
 
const char * pcmk__cluster_local_node_name(void)
 
const char * pcmk__server_message_type(enum pcmk_ipc_server server)
 
void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
 
bool pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node, enum pcmk_ipc_server dest)
 
uint32_t cluster_layer_id
Cluster-layer numeric node ID. 
 
pcmk__node_status_t * pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags)
 
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code. 
 
enum pcmk_ipc_server server
Server this connection is for (if any) 
 
Wrappers for and extensions to glib mainloop. 
 
int pcmk__cpg_connect(pcmk_cluster_t *cluster)
Connect to Corosync CPG. 
 
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready. 
 
struct pcmk__cpg_host_s pcmk__cpg_host_t
 
#define crm_warn(fmt, args...)
 
pcmk_ipc_server
Available IPC interfaces. 
 
void(* destroy)(gpointer)
 
#define crm_debug(fmt, args...)
 
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
 
#define crm_trace(fmt, args...)
 
#define do_crm_log(level, fmt, args...)
Log a message. 
 
#define PCMK_VALUE_MEMBER
 
int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
Set the CPG deliver callback function for a cluster object. 
 
Wrappers for and extensions to libxml2. 
 
enum pcmk_ipc_server pcmk__parse_server(const char *text)
 
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
 
int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
Set the CPG config change callback function for a cluster object. 
 
uint32_t node_id
Local node ID at cluster layer. 
 
#define pcmk__str_copy(str)
 
struct pcmk__cpg_host_s __attribute__((packed))
 
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes) 
 
#define PCMK_VALUE_ONLINE
 
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code. 
 
void pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
 
#define crm_err(fmt, args...)
 
#define G_PRIORITY_MEDIUM
 
pcmk__node_status_t * pcmk__update_peer_state(const char *source, pcmk__node_status_t *node, const char *state, uint64_t membership)
Update a node's state and membership information. 
 
#define msg_data_len(msg)
 
#define pcmk__plural_s(i)
 
IPC interface to Pacemaker daemons. 
 
time_t when_lost
When CPG membership was last lost. 
 
#define pcmk__assert_alloc(nmemb, size)
 
pcmk__node_status_t * crm_update_peer_proc(const char *source, pcmk__node_status_t *peer, uint32_t flag, const char *status)
 
#define PCMK_VALUE_OFFLINE
 
#define crm_info(fmt, args...)
 
pcmk__cluster_private_t * priv
 
Node status data (may be a cluster node or a Pacemaker Remote node) 
 
char * name
Node name as known to cluster layer, or Pacemaker Remote node name. 
 
char * pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, const char **from)
 
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
 
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)