root/lib/cluster/cpg.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. pcmk__cpg_local_nodeid
  2. crm_cs_flush_cb
  3. crm_cs_flush
  4. pcmk_cpg_dispatch
  5. ais_dest
  6. msg_type2text
  7. check_message_sanity
  8. pcmk__cpg_message_data
  9. cmp_member_list_nodeid
  10. cpgreason2str
  11. peer_name
  12. node_left
  13. pcmk__cpg_confchg_cb
  14. pcmk_cpg_set_deliver_fn
  15. pcmk_cpg_set_confchg_fn
  16. pcmk__cpg_connect
  17. pcmk__cpg_disconnect
  18. send_cpg_text
  19. pcmk__cpg_send_xml

   1 /*
   2  * Copyright 2004-2024 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <arpa/inet.h>
  13 #include <inttypes.h>                   // PRIu32
  14 #include <netdb.h>
  15 #include <netinet/in.h>
  16 #include <stdbool.h>
  17 #include <stdint.h>                     // uint32_t
  18 #include <sys/socket.h>
  19 #include <sys/types.h>                  // size_t
  20 #include <sys/utsname.h>
  21 
  22 #include <bzlib.h>
  23 #include <corosync/corodefs.h>
  24 #include <corosync/corotypes.h>
  25 #include <corosync/hdb.h>
  26 #include <corosync/cpg.h>
  27 #include <qb/qbipc_common.h>
  28 #include <qb/qbipcc.h>
  29 #include <qb/qbutil.h>
  30 
  31 #include <crm/cluster/internal.h>
  32 #include <crm/common/ipc.h>
  33 #include <crm/common/ipc_internal.h>    // PCMK__SPECIAL_PID
  34 #include <crm/common/mainloop.h>
  35 #include <crm/common/xml.h>
  36 
  37 #include "crmcluster_private.h"
  38 
  39 /* @TODO Once we can update the public API to require pcmk_cluster_t* in more
  40  *       functions, we can ditch this in favor of cluster->cpg_handle.
  41  */
  42 static cpg_handle_t pcmk_cpg_handle = 0;
  43 
  44 // @TODO These could be moved to pcmk_cluster_t* at that time as well
  45 static bool cpg_evicted = false;
  46 static GList *cs_message_queue = NULL;
  47 static int cs_message_timer = 0;
  48 
  49 /* @COMPAT Any changes to these structs (other than renames) will break all
  50  * rolling upgrades, and should be avoided if possible or done at a major
  51  * version bump if not
  52  */
  53 
  54 struct pcmk__cpg_host_s {
  55     uint32_t id;
  56     uint32_t pid;
  57     gboolean local;             // Unused but needed for compatibility
  58     enum pcmk_ipc_server type;  // For logging only
  59     uint32_t size;
  60     char uname[MAX_NAME];
  61 } __attribute__ ((packed));
  62 
  63 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
  64 
  65 struct pcmk__cpg_msg_s {
  66     struct qb_ipc_response_header header __attribute__ ((aligned(8)));
  67     uint32_t id;
  68     gboolean is_compressed;
  69 
  70     pcmk__cpg_host_t host;
  71     pcmk__cpg_host_t sender;
  72 
  73     uint32_t size;
  74     uint32_t compressed_size;
  75     /* 584 bytes */
  76     char data[0];
  77 
  78 } __attribute__ ((packed));
  79 
  80 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
  81 
  82 static void crm_cs_flush(gpointer data);
  83 
  84 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
  85 
  86 #define cs_repeat(rc, counter, max, code) do {                          \
  87         rc = code;                                                      \
  88         if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) {    \
  89             counter++;                                                  \
  90             crm_debug("Retrying operation after %ds", counter);         \
  91             sleep(counter);                                             \
  92         } else {                                                        \
  93             break;                                                      \
  94         }                                                               \
  95     } while (counter < max)
  96 
  97 /*!
  98  * \internal
  99  * \brief Get the local Corosync node ID (via CPG)
 100  *
 101  * \param[in] handle  CPG connection to use (or 0 to use new connection)
 102  *
 103  * \return Corosync ID of local node (or 0 if not known)
 104  */
 105 uint32_t
 106 pcmk__cpg_local_nodeid(cpg_handle_t handle)
     /* [previous][next][first][last][top][bottom][index][help] */
 107 {
 108     cs_error_t rc = CS_OK;
 109     int retries = 0;
 110     static uint32_t local_nodeid = 0;
 111     cpg_handle_t local_handle = handle;
 112     cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
 113     int fd = -1;
 114     uid_t found_uid = 0;
 115     gid_t found_gid = 0;
 116     pid_t found_pid = 0;
 117     int rv = 0;
 118 
 119     if (local_nodeid != 0) {
 120         return local_nodeid;
 121     }
 122 
 123     if (handle == 0) {
 124         crm_trace("Creating connection");
 125         cs_repeat(rc, retries, 5,
 126                   cpg_model_initialize(&local_handle, CPG_MODEL_V1,
 127                                        (cpg_model_data_t *) &cpg_model_info,
 128                                        NULL));
 129         if (rc != CS_OK) {
 130             crm_err("Could not connect to the CPG API: %s (%d)",
 131                     cs_strerror(rc), rc);
 132             return 0;
 133         }
 134 
 135         rc = cpg_fd_get(local_handle, &fd);
 136         if (rc != CS_OK) {
 137             crm_err("Could not obtain the CPG API connection: %s (%d)",
 138                     cs_strerror(rc), rc);
 139             goto bail;
 140         }
 141 
 142         // CPG provider run as root (at least in given user namespace)?
 143         rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
 144                                           &found_uid, &found_gid);
 145         if (rv == 0) {
 146             crm_err("CPG provider is not authentic:"
 147                     " process %lld (uid: %lld, gid: %lld)",
 148                     (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 149                     (long long) found_uid, (long long) found_gid);
 150             goto bail;
 151 
 152         } else if (rv < 0) {
 153             crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 154                     strerror(-rv), -rv);
 155             goto bail;
 156         }
 157     }
 158 
 159     if (rc == CS_OK) {
 160         retries = 0;
 161         crm_trace("Performing lookup");
 162         cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
 163     }
 164 
 165     if (rc != CS_OK) {
 166         crm_err("Could not get local node id from the CPG API: %s (%d)",
 167                 pcmk__cs_err_str(rc), rc);
 168     }
 169 
 170 bail:
 171     if (handle == 0) {
 172         crm_trace("Closing connection");
 173         cpg_finalize(local_handle);
 174     }
 175     crm_debug("Local nodeid is %u", local_nodeid);
 176     return local_nodeid;
 177 }
 178 
 179 /*!
 180  * \internal
 181  * \brief Callback function for Corosync message queue timer
 182  *
 183  * \param[in] data  CPG handle
 184  *
 185  * \return FALSE (to indicate to glib that timer should not be removed)
 186  */
 187 static gboolean
 188 crm_cs_flush_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 189 {
 190     cs_message_timer = 0;
 191     crm_cs_flush(data);
 192     return FALSE;
 193 }
 194 
 195 // Send no more than this many CPG messages in one flush
 196 #define CS_SEND_MAX 200
 197 
 198 /*!
 199  * \internal
 200  * \brief Send messages in Corosync CPG message queue
 201  *
 202  * \param[in] data   CPG handle
 203  */
 204 static void
 205 crm_cs_flush(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 206 {
 207     unsigned int sent = 0;
 208     guint queue_len = 0;
 209     cs_error_t rc = 0;
 210     cpg_handle_t *handle = (cpg_handle_t *) data;
 211 
 212     if (*handle == 0) {
 213         crm_trace("Connection is dead");
 214         return;
 215     }
 216 
 217     queue_len = g_list_length(cs_message_queue);
 218     if (((queue_len % 1000) == 0) && (queue_len > 1)) {
 219         crm_err("CPG queue has grown to %d", queue_len);
 220 
 221     } else if (queue_len == CS_SEND_MAX) {
 222         crm_warn("CPG queue has grown to %d", queue_len);
 223     }
 224 
 225     if (cs_message_timer != 0) {
 226         /* There is already a timer, wait until it goes off */
 227         crm_trace("Timer active %d", cs_message_timer);
 228         return;
 229     }
 230 
 231     while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
 232         struct iovec *iov = cs_message_queue->data;
 233 
 234         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
 235         if (rc != CS_OK) {
 236             break;
 237         }
 238 
 239         sent++;
 240         crm_trace("CPG message sent, size=%llu",
 241                   (unsigned long long) iov->iov_len);
 242 
 243         cs_message_queue = g_list_remove(cs_message_queue, iov);
 244         free(iov->iov_base);
 245         free(iov);
 246     }
 247 
 248     queue_len -= sent;
 249     do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
 250                "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
 251                sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
 252                (int) rc);
 253 
 254     if (cs_message_queue) {
 255         uint32_t delay_ms = 100;
 256         if (rc != CS_OK) {
 257             /* Proportionally more if sending failed but cap at 1s */
 258             delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
 259         }
 260         cs_message_timer = pcmk__create_timer(delay_ms, crm_cs_flush_cb, data);
 261     }
 262 }
 263 
 264 /*!
 265  * \internal
 266  * \brief Dispatch function for CPG handle
 267  *
 268  * \param[in,out] user_data  Cluster object
 269  *
 270  * \return 0 on success, -1 on error (per mainloop_io_t interface)
 271  */
 272 static int
 273 pcmk_cpg_dispatch(gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
 274 {
 275     cs_error_t rc = CS_OK;
 276     pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
 277 
 278     rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE);
 279     if (rc != CS_OK) {
 280         crm_err("Connection to the CPG API failed: %s (%d)",
 281                 pcmk__cs_err_str(rc), rc);
 282         cpg_finalize(cluster->priv->cpg_handle);
 283         cluster->priv->cpg_handle = 0;
 284         return -1;
 285 
 286     } else if (cpg_evicted) {
 287         crm_err("Evicted from CPG membership");
 288         return -1;
 289     }
 290     return 0;
 291 }
 292 
 293 static inline const char *
 294 ais_dest(const pcmk__cpg_host_t *host)
     /* [previous][next][first][last][top][bottom][index][help] */
 295 {
 296     return (host->size > 0)? host->uname : "<all>";
 297 }
 298 
 299 static inline const char *
 300 msg_type2text(enum pcmk_ipc_server type)
     /* [previous][next][first][last][top][bottom][index][help] */
 301 {
 302     const char *name = pcmk__server_message_type(type);
 303 
 304     return pcmk__s(name, "unknown");
 305 }
 306 
 307 /*!
 308  * \internal
 309  * \brief Check whether a Corosync CPG message is valid
 310  *
 311  * \param[in] msg   Corosync CPG message to check
 312  *
 313  * \return true if \p msg is valid, otherwise false
 314  */
 315 static bool
 316 check_message_sanity(const pcmk__cpg_msg_t *msg)
     /* [previous][next][first][last][top][bottom][index][help] */
 317 {
 318     int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
 319 
 320     if (payload_size < 1) {
 321         crm_err("%sCPG message %d from %s invalid: "
 322                 "Claimed size of %d bytes is too small "
 323                 QB_XS " from %s[%u] to %s@%s",
 324                 (msg->is_compressed? "Compressed " : ""),
 325                 msg->id, ais_dest(&(msg->sender)),
 326                 (int) msg->header.size,
 327                 msg_type2text(msg->sender.type), msg->sender.pid,
 328                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 329         return false;
 330     }
 331 
 332     if (msg->header.error != CS_OK) {
 333         crm_err("%sCPG message %d from %s invalid: "
 334                 "Sender indicated error %d "
 335                 QB_XS " from %s[%u] to %s@%s",
 336                 (msg->is_compressed? "Compressed " : ""),
 337                 msg->id, ais_dest(&(msg->sender)),
 338                 msg->header.error,
 339                 msg_type2text(msg->sender.type), msg->sender.pid,
 340                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 341         return false;
 342     }
 343 
 344     if (msg_data_len(msg) != payload_size) {
 345         crm_err("%sCPG message %d from %s invalid: "
 346                 "Total size %d inconsistent with payload size %d "
 347                 QB_XS " from %s[%u] to %s@%s",
 348                 (msg->is_compressed? "Compressed " : ""),
 349                 msg->id, ais_dest(&(msg->sender)),
 350                 (int) msg->header.size, (int) msg_data_len(msg),
 351                 msg_type2text(msg->sender.type), msg->sender.pid,
 352                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 353         return false;
 354     }
 355 
 356     if (!msg->is_compressed &&
 357         /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
 358          * but checking the last byte or two should be quick
 359          */
 360         (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
 361          || (msg->data[msg->size - 1] != '\0'))) {
 362         crm_err("CPG message %d from %s invalid: "
 363                 "Payload does not end at byte %llu "
 364                 QB_XS " from %s[%u] to %s@%s",
 365                 msg->id, ais_dest(&(msg->sender)),
 366                 (unsigned long long) msg->size,
 367                 msg_type2text(msg->sender.type), msg->sender.pid,
 368                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 369         return false;
 370     }
 371 
 372     crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
 373               (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
 374               msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
 375               ais_dest(&(msg->sender)),
 376               msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 377     return true;
 378 }
 379 
 380 /*!
 381  * \internal
 382  * \brief Extract text data from a Corosync CPG message
 383  *
 384  * \param[in]     handle     CPG connection (to get local node ID if not known)
 385  * \param[in]     sender_id  Corosync ID of node that sent message
 386  * \param[in]     pid        Process ID of message sender (for logging only)
 387  * \param[in,out] content    CPG message
 388  * \param[out]    from       If not \c NULL, will be set to sender uname
 389  *                           (valid for the lifetime of \p content)
 390  *
 391  * \return Newly allocated string with message data, or NULL for errors and
 392  *         messages not intended for the local node
 393  *
 394  * \note The caller is responsible for freeing the return value using \c free().
 395  */
 396 char *
 397 pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
     /* [previous][next][first][last][top][bottom][index][help] */
 398                        void *content, const char **from)
 399 {
 400     char *data = NULL;
 401     pcmk__cpg_msg_t *msg = content;
 402 
 403     if (from != NULL) {
 404         *from = NULL;
 405     }
 406 
 407     if (handle != 0) {
 408         uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
 409         const char *local_name = pcmk__cluster_local_node_name();
 410 
 411         // Update or validate message sender ID
 412         if (msg->sender.id == 0) {
 413             msg->sender.id = sender_id;
 414         } else if (msg->sender.id != sender_id) {
 415             crm_warn("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
 416                      ": claimed ID %" PRIu32,
 417                     sender_id, pid, msg->sender.id);
 418             return NULL;
 419         }
 420 
 421         // Ignore messages that aren't for the local node
 422         if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
 423             crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
 424                       ": for ID %" PRIu32 " not %" PRIu32,
 425                       sender_id, pid, msg->host.id, local_nodeid);
 426             return NULL;
 427         }
 428         if ((msg->host.size > 0)
 429             && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
 430 
 431             crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
 432                       ": for name %s not %s",
 433                       sender_id, pid, msg->host.uname, local_name);
 434             return NULL;
 435         }
 436 
 437         // Add sender name if not in original message
 438         if (msg->sender.size == 0) {
 439             const pcmk__node_status_t *peer =
 440                 pcmk__get_node(sender_id, NULL, NULL,
 441                                pcmk__node_search_cluster_member);
 442 
 443             if (peer->name == NULL) {
 444                 crm_debug("Received CPG message from node with ID %" PRIu32
 445                           " but its name is unknown", sender_id);
 446             } else {
 447                 crm_debug("Updating name of CPG message sender with ID %" PRIu32
 448                           " to %s", sender_id, peer->name);
 449                 msg->sender.size = strlen(peer->name);
 450                 memset(msg->sender.uname, 0, MAX_NAME);
 451                 memcpy(msg->sender.uname, peer->name, msg->sender.size);
 452             }
 453         }
 454     }
 455 
 456     // Ensure sender is in peer cache (though it should already be)
 457     pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
 458                    pcmk__node_search_cluster_member);
 459 
 460     if (from != NULL) {
 461         *from = msg->sender.uname;
 462     }
 463 
 464     if (!check_message_sanity(msg)) {
 465         return NULL;
 466     }
 467 
 468     if (msg->is_compressed && (msg->size > 0)) {
 469         int rc = BZ_OK;
 470         unsigned int new_size = msg->size + 1;
 471         char *uncompressed = pcmk__assert_alloc(1, new_size);
 472 
 473         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
 474                                         msg->compressed_size, 1, 0);
 475         rc = pcmk__bzlib2rc(rc);
 476         if ((rc == pcmk_rc_ok) && (msg->size != new_size)) { // libbz2 bug?
 477             rc = pcmk_rc_compression;
 478         }
 479         if (rc != pcmk_rc_ok) {
 480             free(uncompressed);
 481             crm_warn("Ignoring compressed CPG message %d from %s (ID %" PRIu32
 482                     " PID %" PRIu32 "): %s",
 483                      msg->id, ais_dest(&(msg->sender)), sender_id, pid,
 484                      pcmk_rc_str(rc));
 485             return NULL;
 486         }
 487         data = uncompressed;
 488 
 489     } else {
 490         data = pcmk__str_copy(msg->data);
 491     }
 492 
 493     crm_trace("Received %sCPG message %d from %s (ID %" PRIu32
 494               " PID %" PRIu32 "): %.40s...",
 495               (msg->is_compressed? "compressed " : ""),
 496               msg->id, ais_dest(&(msg->sender)), sender_id, pid, msg->data);
 497     return data;
 498 }
 499 
 500 /*!
 501  * \internal
 502  * \brief Compare cpg_address objects by node ID
 503  *
 504  * \param[in] first   First cpg_address structure to compare
 505  * \param[in] second  Second cpg_address structure to compare
 506  *
 507  * \return Negative number if first's node ID is lower,
 508  *         positive number if first's node ID is greater,
 509  *         or 0 if both node IDs are equal
 510  */
 511 static int
 512 cmp_member_list_nodeid(const void *first, const void *second)
     /* [previous][next][first][last][top][bottom][index][help] */
 513 {
 514     const struct cpg_address *const a = *((const struct cpg_address **) first),
 515                              *const b = *((const struct cpg_address **) second);
 516     if (a->nodeid < b->nodeid) {
 517         return -1;
 518     } else if (a->nodeid > b->nodeid) {
 519         return 1;
 520     }
 521     /* don't bother with "reason" nor "pid" */
 522     return 0;
 523 }
 524 
 525 /*!
 526  * \internal
 527  * \brief Get a readable string equivalent of a cpg_reason_t value
 528  *
 529  * \param[in] reason  CPG reason value
 530  *
 531  * \return Readable string suitable for logging
 532  */
 533 static const char *
 534 cpgreason2str(cpg_reason_t reason)
     /* [previous][next][first][last][top][bottom][index][help] */
 535 {
 536     switch (reason) {
 537         case CPG_REASON_JOIN:       return " via cpg_join";
 538         case CPG_REASON_LEAVE:      return " via cpg_leave";
 539         case CPG_REASON_NODEDOWN:   return " via cluster exit";
 540         case CPG_REASON_NODEUP:     return " via cluster join";
 541         case CPG_REASON_PROCDOWN:   return " for unknown reason";
 542         default:                    break;
 543     }
 544     return "";
 545 }
 546 
 547 /*!
 548  * \internal
 549  * \brief Get a log-friendly node name
 550  *
 551  * \param[in] peer  Node to check
 552  *
 553  * \return Node's uname, or readable string if not known
 554  */
 555 static inline const char *
 556 peer_name(const pcmk__node_status_t *peer)
     /* [previous][next][first][last][top][bottom][index][help] */
 557 {
 558     return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node";
 559 }
 560 
 561 /*!
 562  * \internal
 563  * \brief Process a CPG peer's leaving the cluster
 564  *
 565  * \param[in] cpg_group_name      CPG group name (for logging)
 566  * \param[in] event_counter       Event number (for logging)
 567  * \param[in] local_nodeid        Node ID of local node
 568  * \param[in] cpg_peer            CPG peer that left
 569  * \param[in] sorted_member_list  List of remaining members, qsort()-ed by ID
 570  * \param[in] member_list_entries Number of entries in \p sorted_member_list
 571  */
 572 static void
 573 node_left(const char *cpg_group_name, int event_counter,
     /* [previous][next][first][last][top][bottom][index][help] */
 574           uint32_t local_nodeid, const struct cpg_address *cpg_peer,
 575           const struct cpg_address **sorted_member_list,
 576           size_t member_list_entries)
 577 {
 578     pcmk__node_status_t *peer =
 579         pcmk__search_node_caches(cpg_peer->nodeid, NULL,
 580                                  pcmk__node_search_cluster_member);
 581     const struct cpg_address **rival = NULL;
 582 
 583     /* Most CPG-related Pacemaker code assumes that only one process on a node
 584      * can be in the process group, but Corosync does not impose this
 585      * limitation, and more than one can be a member in practice due to a
 586      * daemon attempting to start while another instance is already running.
 587      *
 588      * Check for any such duplicate instances, because we don't want to process
 589      * their leaving as if our actual peer left. If the peer that left still has
 590      * an entry in sorted_member_list (with a different PID), we will ignore the
 591      * leaving.
 592      *
 593      * @TODO Track CPG members' PIDs so we can tell exactly who left.
 594      */
 595     if (peer != NULL) {
 596         rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
 597                         sizeof(const struct cpg_address *),
 598                         cmp_member_list_nodeid);
 599     }
 600 
 601     if (rival == NULL) {
 602         crm_info("Group %s event %d: %s (node %u pid %u) left%s",
 603                  cpg_group_name, event_counter, peer_name(peer),
 604                  cpg_peer->nodeid, cpg_peer->pid,
 605                  cpgreason2str(cpg_peer->reason));
 606         if (peer != NULL) {
 607             crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 608                                  PCMK_VALUE_OFFLINE);
 609         }
 610     } else if (cpg_peer->nodeid == local_nodeid) {
 611         crm_warn("Group %s event %d: duplicate local pid %u left%s",
 612                  cpg_group_name, event_counter,
 613                  cpg_peer->pid, cpgreason2str(cpg_peer->reason));
 614     } else {
 615         crm_warn("Group %s event %d: "
 616                  "%s (node %u) duplicate pid %u left%s (%u remains)",
 617                  cpg_group_name, event_counter, peer_name(peer),
 618                  cpg_peer->nodeid, cpg_peer->pid,
 619                  cpgreason2str(cpg_peer->reason), (*rival)->pid);
 620     }
 621 }
 622 
 623 /*!
 624  * \internal
 625  * \brief Handle a CPG configuration change event
 626  *
 627  * \param[in] handle               CPG connection
 628  * \param[in] group_name           CPG group name
 629  * \param[in] member_list          List of current CPG members
 630  * \param[in] member_list_entries  Number of entries in \p member_list
 631  * \param[in] left_list            List of CPG members that left
 632  * \param[in] left_list_entries    Number of entries in \p left_list
 633  * \param[in] joined_list          List of CPG members that joined
 634  * \param[in] joined_list_entries  Number of entries in \p joined_list
 635  *
 636  * \note This is of type \c cpg_confchg_fn_t, intended to be used in a
 637  *       \c cpg_callbacks_t object.
 638  */
 639 void
 640 pcmk__cpg_confchg_cb(cpg_handle_t handle,
     /* [previous][next][first][last][top][bottom][index][help] */
 641                      const struct cpg_name *group_name,
 642                      const struct cpg_address *member_list,
 643                      size_t member_list_entries,
 644                      const struct cpg_address *left_list,
 645                      size_t left_list_entries,
 646                      const struct cpg_address *joined_list,
 647                      size_t joined_list_entries)
 648 {
 649     static int counter = 0;
 650 
 651     bool found = false;
 652     uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
 653     const struct cpg_address **sorted = NULL;
 654 
 655     sorted = pcmk__assert_alloc(member_list_entries,
 656                                 sizeof(const struct cpg_address *));
 657 
 658     for (size_t iter = 0; iter < member_list_entries; iter++) {
 659         sorted[iter] = member_list + iter;
 660     }
 661 
 662     // So that the cross-matching of multiply-subscribed nodes is then cheap
 663     qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
 664           cmp_member_list_nodeid);
 665 
 666     for (int i = 0; i < left_list_entries; i++) {
 667         node_left(group_name->value, counter, local_nodeid, &left_list[i],
 668                   sorted, member_list_entries);
 669     }
 670     free(sorted);
 671     sorted = NULL;
 672 
 673     for (int i = 0; i < joined_list_entries; i++) {
 674         crm_info("Group %s event %d: node %u pid %u joined%s",
 675                  group_name->value, counter, joined_list[i].nodeid,
 676                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
 677     }
 678 
 679     for (int i = 0; i < member_list_entries; i++) {
 680         pcmk__node_status_t *peer =
 681             pcmk__get_node(member_list[i].nodeid, NULL, NULL,
 682                            pcmk__node_search_cluster_member);
 683 
 684         if (member_list[i].nodeid == local_nodeid
 685                 && member_list[i].pid != getpid()) {
 686             // See the note in node_left()
 687             crm_warn("Group %s event %d: detected duplicate local pid %u",
 688                      group_name->value, counter, member_list[i].pid);
 689             continue;
 690         }
 691         crm_info("Group %s event %d: %s (node %u pid %u) is member",
 692                  group_name->value, counter, peer_name(peer),
 693                  member_list[i].nodeid, member_list[i].pid);
 694 
 695         /* If the caller left auto-reaping enabled, this will also update the
 696          * state to member.
 697          */
 698         peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 699                                     PCMK_VALUE_ONLINE);
 700 
 701         if (peer && peer->state && strcmp(peer->state, PCMK_VALUE_MEMBER)) {
 702             /* The node is a CPG member, but we currently think it's not a
 703              * cluster member. This is possible only if auto-reaping was
 704              * disabled. The node may be joining, and we happened to get the CPG
 705              * notification before the quorum notification; or the node may have
 706              * just died, and we are processing its final messages; or a bug
 707              * has affected the peer cache.
 708              */
 709             time_t now = time(NULL);
 710 
 711             if (peer->when_lost == 0) {
 712                 // Track when we first got into this contradictory state
 713                 peer->when_lost = now;
 714 
 715             } else if (now > (peer->when_lost + 60)) {
 716                 // If it persists for more than a minute, update the state
 717                 crm_warn("Node %u is member of group %s but was believed "
 718                          "offline",
 719                          member_list[i].nodeid, group_name->value);
 720                 pcmk__update_peer_state(__func__, peer, PCMK_VALUE_MEMBER, 0);
 721             }
 722         }
 723 
 724         if (local_nodeid == member_list[i].nodeid) {
 725             found = true;
 726         }
 727     }
 728 
 729     if (!found) {
 730         crm_err("Local node was evicted from group %s", group_name->value);
 731         cpg_evicted = true;
 732     }
 733 
 734     counter++;
 735 }
 736 
 737 /*!
 738  * \brief Set the CPG deliver callback function for a cluster object
 739  *
 740  * \param[in,out] cluster  Cluster object
 741  * \param[in]     fn       Deliver callback function to set
 742  *
 743  * \return Standard Pacemaker return code
 744  */
 745 int
 746 pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
     /* [previous][next][first][last][top][bottom][index][help] */
 747 {
 748     if (cluster == NULL) {
 749         return EINVAL;
 750     }
 751     cluster->cpg.cpg_deliver_fn = fn;
 752     return pcmk_rc_ok;
 753 }
 754 
 755 /*!
 756  * \brief Set the CPG config change callback function for a cluster object
 757  *
 758  * \param[in,out] cluster  Cluster object
 759  * \param[in]     fn       Configuration change callback function to set
 760  *
 761  * \return Standard Pacemaker return code
 762  */
 763 int
 764 pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
     /* [previous][next][first][last][top][bottom][index][help] */
 765 {
 766     if (cluster == NULL) {
 767         return EINVAL;
 768     }
 769     cluster->cpg.cpg_confchg_fn = fn;
 770     return pcmk_rc_ok;
 771 }
 772 
 773 /*!
 774  * \brief Connect to Corosync CPG
 775  *
 776  * \param[in,out] cluster  Initialized cluster object to connect
 777  *
 778  * \return Standard Pacemaker return code
 779  */
 780 int
 781 pcmk__cpg_connect(pcmk_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
 782 {
 783     cs_error_t rc;
 784     int fd = -1;
 785     int retries = 0;
 786     uint32_t id = 0;
 787     pcmk__node_status_t *peer = NULL;
 788     cpg_handle_t handle = 0;
 789     const char *cpg_group_name = NULL;
 790     uid_t found_uid = 0;
 791     gid_t found_gid = 0;
 792     pid_t found_pid = 0;
 793     int rv;
 794 
 795     struct mainloop_fd_callbacks cpg_fd_callbacks = {
 796         .dispatch = pcmk_cpg_dispatch,
 797         .destroy = cluster->destroy,
 798     };
 799 
 800     cpg_model_v1_data_t cpg_model_info = {
 801             .model = CPG_MODEL_V1,
 802             .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
 803             .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
 804             .cpg_totem_confchg_fn = NULL,
 805             .flags = 0,
 806     };
 807 
 808     cpg_evicted = false;
 809 
 810     cpg_group_name = pcmk__server_message_type(cluster->priv->server);
 811     if (cpg_group_name == NULL) {
 812         /* The name will already be non-NULL for Pacemaker servers. If a
 813          * command-line tool or external caller connects to the cluster,
 814          * they will join this CPG group.
 815          */
 816         cpg_group_name = pcmk__s(crm_system_name, "unknown");
 817     }
 818     memset(cluster->priv->group.value, 0, 128);
 819     strncpy(cluster->priv->group.value, cpg_group_name, 127);
 820     cluster->priv->group.length = strlen(cluster->priv->group.value) + 1;
 821 
 822     cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
 823     if (rc != CS_OK) {
 824         crm_err("Could not connect to the CPG API: %s (%d)",
 825                 cs_strerror(rc), rc);
 826         goto bail;
 827     }
 828 
 829     rc = cpg_fd_get(handle, &fd);
 830     if (rc != CS_OK) {
 831         crm_err("Could not obtain the CPG API connection: %s (%d)",
 832                 cs_strerror(rc), rc);
 833         goto bail;
 834     }
 835 
 836     /* CPG provider run as root (in given user namespace, anyway)? */
 837     if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 838                                             &found_uid, &found_gid))) {
 839         crm_err("CPG provider is not authentic:"
 840                 " process %lld (uid: %lld, gid: %lld)",
 841                 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 842                 (long long) found_uid, (long long) found_gid);
 843         rc = CS_ERR_ACCESS;
 844         goto bail;
 845     } else if (rv < 0) {
 846         crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 847                 strerror(-rv), -rv);
 848         rc = CS_ERR_ACCESS;
 849         goto bail;
 850     }
 851 
 852     id = pcmk__cpg_local_nodeid(handle);
 853     if (id == 0) {
 854         crm_err("Could not get local node id from the CPG API");
 855         goto bail;
 856 
 857     }
 858     cluster->priv->node_id = id;
 859 
 860     retries = 0;
 861     cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group));
 862     if (rc != CS_OK) {
 863         crm_err("Could not join the CPG group '%s': %d", cpg_group_name, rc);
 864         goto bail;
 865     }
 866 
 867     pcmk_cpg_handle = handle;
 868     cluster->priv->cpg_handle = handle;
 869     mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
 870 
 871   bail:
 872     if (rc != CS_OK) {
 873         cpg_finalize(handle);
 874         // @TODO Map rc to more specific Pacemaker return code
 875         return ENOTCONN;
 876     }
 877 
 878     peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
 879     crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
 880     return pcmk_rc_ok;
 881 }
 882 
 883 /*!
 884  * \internal
 885  * \brief Disconnect from Corosync CPG
 886  *
 887  * \param[in,out] cluster  Cluster object to disconnect
 888  */
 889 void
 890 pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
 891 {
 892     pcmk_cpg_handle = 0;
 893     if (cluster->priv->cpg_handle != 0) {
 894         crm_trace("Disconnecting CPG");
 895         cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group);
 896         cpg_finalize(cluster->priv->cpg_handle);
 897         cluster->priv->cpg_handle = 0;
 898 
 899     } else {
 900         crm_info("No CPG connection");
 901     }
 902 }
 903 
 904 /*!
 905  * \internal
 906  * \brief Send string data via Corosync CPG
 907  *
 908  * \param[in] data   Data to send
 909  * \param[in] node   Cluster node to send message to
 910  * \param[in] dest   Type of message to send
 911  *
 912  * \return \c true on success, or \c false otherwise
 913  */
 914 static bool
 915 send_cpg_text(const char *data, const pcmk__node_status_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
 916               enum pcmk_ipc_server dest)
 917 {
 918     static int msg_id = 0;
 919     static int local_pid = 0;
 920     static int local_name_len = 0;
 921     static const char *local_name = NULL;
 922 
 923     char *target = NULL;
 924     struct iovec *iov;
 925     pcmk__cpg_msg_t *msg = NULL;
 926 
 927     if (local_name == NULL) {
 928         local_name = pcmk__cluster_local_node_name();
 929     }
 930     if ((local_name_len == 0) && (local_name != NULL)) {
 931         local_name_len = strlen(local_name);
 932     }
 933 
 934     if (data == NULL) {
 935         data = "";
 936     }
 937 
 938     if (local_pid == 0) {
 939         local_pid = getpid();
 940     }
 941 
 942     msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
 943 
 944     msg_id++;
 945     msg->id = msg_id;
 946     msg->header.error = CS_OK;
 947 
 948     msg->host.type = dest;
 949 
 950     if (node != NULL) {
 951         if (node->name != NULL) {
 952             target = pcmk__str_copy(node->name);
 953             msg->host.size = strlen(node->name);
 954             memset(msg->host.uname, 0, MAX_NAME);
 955             memcpy(msg->host.uname, node->name, msg->host.size);
 956 
 957         } else {
 958             target = crm_strdup_printf("%" PRIu32, node->cluster_layer_id);
 959         }
 960         msg->host.id = node->cluster_layer_id;
 961 
 962     } else {
 963         target = pcmk__str_copy("all");
 964     }
 965 
 966     msg->sender.id = 0;
 967     msg->sender.type = pcmk__parse_server(crm_system_name);
 968     msg->sender.pid = local_pid;
 969     msg->sender.size = local_name_len;
 970     memset(msg->sender.uname, 0, MAX_NAME);
 971 
 972     if ((local_name != NULL) && (msg->sender.size != 0)) {
 973         memcpy(msg->sender.uname, local_name, msg->sender.size);
 974     }
 975 
 976     msg->size = 1 + strlen(data);
 977     msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
 978 
 979     if (msg->size < CRM_BZ2_THRESHOLD) {
 980         msg = pcmk__realloc(msg, msg->header.size);
 981         memcpy(msg->data, data, msg->size);
 982 
 983     } else {
 984         char *compressed = NULL;
 985         unsigned int new_size = 0;
 986 
 987         if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
 988                            &new_size) == pcmk_rc_ok) {
 989 
 990             msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
 991             msg = pcmk__realloc(msg, msg->header.size);
 992             memcpy(msg->data, compressed, new_size);
 993 
 994             msg->is_compressed = TRUE;
 995             msg->compressed_size = new_size;
 996 
 997         } else {
 998             // cppcheck seems not to understand the abort logic in pcmk__realloc
 999             // cppcheck-suppress memleak
1000             msg = pcmk__realloc(msg, msg->header.size);
1001             memcpy(msg->data, data, msg->size);
1002         }
1003 
1004         free(compressed);
1005     }
1006 
1007     iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1008     iov->iov_base = msg;
1009     iov->iov_len = msg->header.size;
1010 
1011     if (msg->compressed_size > 0) {
1012         crm_trace("Queueing CPG message %u to %s "
1013                   "(%llu bytes, %d bytes compressed payload): %.200s",
1014                   msg->id, target, (unsigned long long) iov->iov_len,
1015                   msg->compressed_size, data);
1016     } else {
1017         crm_trace("Queueing CPG message %u to %s "
1018                   "(%llu bytes, %d bytes payload): %.200s",
1019                   msg->id, target, (unsigned long long) iov->iov_len,
1020                   msg->size, data);
1021     }
1022 
1023     free(target);
1024 
1025     cs_message_queue = g_list_append(cs_message_queue, iov);
1026     crm_cs_flush(&pcmk_cpg_handle);
1027 
1028     return true;
1029 }
1030 
1031 /*!
1032  * \internal
1033  * \brief Send an XML message via Corosync CPG
1034  *
1035  * \param[in] msg   XML message to send
1036  * \param[in] node  Cluster node to send message to
1037  * \param[in] dest  Type of message to send
1038  *
1039  * \return TRUE on success, otherwise FALSE
1040  */
1041 bool
1042 pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1043                    enum pcmk_ipc_server dest)
1044 {
1045     bool rc = true;
1046     GString *data = g_string_sized_new(1024);
1047 
1048     pcmk__xml_string(msg, 0, data, 0);
1049 
1050     rc = send_cpg_text(data->str, node, dest);
1051     g_string_free(data, TRUE);
1052     return rc;
1053 }

/* [previous][next][first][last][top][bottom][index][help] */