root/lib/cluster/cpg.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. pcmk__cpg_local_nodeid
  2. crm_cs_flush_cb
  3. crm_cs_flush
  4. pcmk_cpg_dispatch
  5. ais_dest
  6. msg_type2text
  7. check_message_sanity
  8. pcmk__cpg_message_data
  9. cmp_member_list_nodeid
  10. cpgreason2str
  11. peer_name
  12. node_left
  13. pcmk__cpg_confchg_cb
  14. pcmk_cpg_set_deliver_fn
  15. pcmk_cpg_set_confchg_fn
  16. pcmk__cpg_connect
  17. pcmk__cpg_disconnect
  18. send_cpg_text
  19. pcmk__cpg_send_xml
  20. cluster_connect_cpg
  21. cluster_disconnect_cpg
  22. get_local_nodeid
  23. pcmk_cpg_membership
  24. send_cluster_text
  25. pcmk_message_common_cs
  26. text2msg_type

   1 /*
   2  * Copyright 2004-2024 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <arpa/inet.h>
  13 #include <inttypes.h>                   // PRIu32
  14 #include <netdb.h>
  15 #include <netinet/in.h>
  16 #include <stdbool.h>
  17 #include <stdint.h>                     // uint32_t
  18 #include <sys/socket.h>
  19 #include <sys/types.h>                  // size_t
  20 #include <sys/utsname.h>
  21 
  22 #include <bzlib.h>
  23 #include <corosync/corodefs.h>
  24 #include <corosync/corotypes.h>
  25 #include <corosync/hdb.h>
  26 #include <corosync/cpg.h>
  27 #include <qb/qbipc_common.h>
  28 #include <qb/qbipcc.h>
  29 #include <qb/qbutil.h>
  30 
  31 #include <crm/cluster/internal.h>
  32 #include <crm/common/ipc.h>
  33 #include <crm/common/ipc_internal.h>    // PCMK__SPECIAL_PID
  34 #include <crm/common/mainloop.h>
  35 #include <crm/common/xml.h>
  36 
  37 #include "crmcluster_private.h"
  38 
  39 /* @TODO Once we can update the public API to require pcmk_cluster_t* in more
  40  *       functions, we can ditch this in favor of cluster->cpg_handle.
  41  */
  42 static cpg_handle_t pcmk_cpg_handle = 0;
  43 
  44 // @TODO These could be moved to pcmk_cluster_t* at that time as well
  45 static bool cpg_evicted = false;
  46 static GList *cs_message_queue = NULL;
  47 static int cs_message_timer = 0;
  48 
  49 struct pcmk__cpg_host_s {
  50     uint32_t id;
  51     uint32_t pid;
  52     gboolean local;
  53     enum crm_ais_msg_types type;
  54     uint32_t size;
  55     char uname[MAX_NAME];
  56 } __attribute__ ((packed));
  57 
  58 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
  59 
  60 struct pcmk__cpg_msg_s {
  61     struct qb_ipc_response_header header __attribute__ ((aligned(8)));
  62     uint32_t id;
  63     gboolean is_compressed;
  64 
  65     pcmk__cpg_host_t host;
  66     pcmk__cpg_host_t sender;
  67 
  68     uint32_t size;
  69     uint32_t compressed_size;
  70     /* 584 bytes */
  71     char data[0];
  72 
  73 } __attribute__ ((packed));
  74 
  75 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
  76 
  77 static void crm_cs_flush(gpointer data);
  78 
  79 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
  80 
  81 #define cs_repeat(rc, counter, max, code) do {                          \
  82         rc = code;                                                      \
  83         if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) {    \
  84             counter++;                                                  \
  85             crm_debug("Retrying operation after %ds", counter);         \
  86             sleep(counter);                                             \
  87         } else {                                                        \
  88             break;                                                      \
  89         }                                                               \
  90     } while (counter < max)
  91 
  92 /*!
  93  * \internal
  94  * \brief Get the local Corosync node ID (via CPG)
  95  *
  96  * \param[in] handle  CPG connection to use (or 0 to use new connection)
  97  *
  98  * \return Corosync ID of local node (or 0 if not known)
  99  */
 100 uint32_t
 101 pcmk__cpg_local_nodeid(cpg_handle_t handle)
     /* [previous][next][first][last][top][bottom][index][help] */
 102 {
 103     cs_error_t rc = CS_OK;
 104     int retries = 0;
 105     static uint32_t local_nodeid = 0;
 106     cpg_handle_t local_handle = handle;
 107     cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
 108     int fd = -1;
 109     uid_t found_uid = 0;
 110     gid_t found_gid = 0;
 111     pid_t found_pid = 0;
 112     int rv = 0;
 113 
 114     if (local_nodeid != 0) {
 115         return local_nodeid;
 116     }
 117 
 118     if (handle == 0) {
 119         crm_trace("Creating connection");
 120         cs_repeat(rc, retries, 5,
 121                   cpg_model_initialize(&local_handle, CPG_MODEL_V1,
 122                                        (cpg_model_data_t *) &cpg_model_info,
 123                                        NULL));
 124         if (rc != CS_OK) {
 125             crm_err("Could not connect to the CPG API: %s (%d)",
 126                     cs_strerror(rc), rc);
 127             return 0;
 128         }
 129 
 130         rc = cpg_fd_get(local_handle, &fd);
 131         if (rc != CS_OK) {
 132             crm_err("Could not obtain the CPG API connection: %s (%d)",
 133                     cs_strerror(rc), rc);
 134             goto bail;
 135         }
 136 
 137         // CPG provider run as root (at least in given user namespace)?
 138         rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
 139                                           &found_uid, &found_gid);
 140         if (rv == 0) {
 141             crm_err("CPG provider is not authentic:"
 142                     " process %lld (uid: %lld, gid: %lld)",
 143                     (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 144                     (long long) found_uid, (long long) found_gid);
 145             goto bail;
 146 
 147         } else if (rv < 0) {
 148             crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 149                     strerror(-rv), -rv);
 150             goto bail;
 151         }
 152     }
 153 
 154     if (rc == CS_OK) {
 155         retries = 0;
 156         crm_trace("Performing lookup");
 157         cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
 158     }
 159 
 160     if (rc != CS_OK) {
 161         crm_err("Could not get local node id from the CPG API: %s (%d)",
 162                 pcmk__cs_err_str(rc), rc);
 163     }
 164 
 165 bail:
 166     if (handle == 0) {
 167         crm_trace("Closing connection");
 168         cpg_finalize(local_handle);
 169     }
 170     crm_debug("Local nodeid is %u", local_nodeid);
 171     return local_nodeid;
 172 }
 173 
 174 /*!
 175  * \internal
 176  * \brief Callback function for Corosync message queue timer
 177  *
 178  * \param[in] data  CPG handle
 179  *
 180  * \return FALSE (to indicate to glib that timer should not be removed)
 181  */
 182 static gboolean
 183 crm_cs_flush_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 184 {
 185     cs_message_timer = 0;
 186     crm_cs_flush(data);
 187     return FALSE;
 188 }
 189 
 190 // Send no more than this many CPG messages in one flush
 191 #define CS_SEND_MAX 200
 192 
 193 /*!
 194  * \internal
 195  * \brief Send messages in Corosync CPG message queue
 196  *
 197  * \param[in] data   CPG handle
 198  */
 199 static void
 200 crm_cs_flush(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 201 {
 202     unsigned int sent = 0;
 203     guint queue_len = 0;
 204     cs_error_t rc = 0;
 205     cpg_handle_t *handle = (cpg_handle_t *) data;
 206 
 207     if (*handle == 0) {
 208         crm_trace("Connection is dead");
 209         return;
 210     }
 211 
 212     queue_len = g_list_length(cs_message_queue);
 213     if (((queue_len % 1000) == 0) && (queue_len > 1)) {
 214         crm_err("CPG queue has grown to %d", queue_len);
 215 
 216     } else if (queue_len == CS_SEND_MAX) {
 217         crm_warn("CPG queue has grown to %d", queue_len);
 218     }
 219 
 220     if (cs_message_timer != 0) {
 221         /* There is already a timer, wait until it goes off */
 222         crm_trace("Timer active %d", cs_message_timer);
 223         return;
 224     }
 225 
 226     while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
 227         struct iovec *iov = cs_message_queue->data;
 228 
 229         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
 230         if (rc != CS_OK) {
 231             break;
 232         }
 233 
 234         sent++;
 235         crm_trace("CPG message sent, size=%llu",
 236                   (unsigned long long) iov->iov_len);
 237 
 238         cs_message_queue = g_list_remove(cs_message_queue, iov);
 239         free(iov->iov_base);
 240         free(iov);
 241     }
 242 
 243     queue_len -= sent;
 244     do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
 245                "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
 246                sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
 247                (int) rc);
 248 
 249     if (cs_message_queue) {
 250         uint32_t delay_ms = 100;
 251         if (rc != CS_OK) {
 252             /* Proportionally more if sending failed but cap at 1s */
 253             delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
 254         }
 255         cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
 256     }
 257 }
 258 
 259 /*!
 260  * \internal
 261  * \brief Dispatch function for CPG handle
 262  *
 263  * \param[in,out] user_data  Cluster object
 264  *
 265  * \return 0 on success, -1 on error (per mainloop_io_t interface)
 266  */
 267 static int
 268 pcmk_cpg_dispatch(gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
 269 {
 270     cs_error_t rc = CS_OK;
 271     pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
 272 
 273     rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
 274     if (rc != CS_OK) {
 275         crm_err("Connection to the CPG API failed: %s (%d)",
 276                 pcmk__cs_err_str(rc), rc);
 277         cpg_finalize(cluster->cpg_handle);
 278         cluster->cpg_handle = 0;
 279         return -1;
 280 
 281     } else if (cpg_evicted) {
 282         crm_err("Evicted from CPG membership");
 283         return -1;
 284     }
 285     return 0;
 286 }
 287 
 288 static inline const char *
 289 ais_dest(const pcmk__cpg_host_t *host)
     /* [previous][next][first][last][top][bottom][index][help] */
 290 {
 291     if (host->local) {
 292         return "local";
 293     } else if (host->size > 0) {
 294         return host->uname;
 295     } else {
 296         return "<all>";
 297     }
 298 }
 299 
 300 static inline const char *
 301 msg_type2text(enum crm_ais_msg_types type)
     /* [previous][next][first][last][top][bottom][index][help] */
 302 {
 303     const char *text = "unknown";
 304 
 305     switch (type) {
 306         case crm_msg_none:
 307             text = "unknown";
 308             break;
 309         case crm_msg_ais:
 310             text = "ais";
 311             break;
 312         case crm_msg_cib:
 313             text = "cib";
 314             break;
 315         case crm_msg_crmd:
 316             text = "crmd";
 317             break;
 318         case crm_msg_pe:
 319             text = "pengine";
 320             break;
 321         case crm_msg_te:
 322             text = "tengine";
 323             break;
 324         case crm_msg_lrmd:
 325             text = "lrmd";
 326             break;
 327         case crm_msg_attrd:
 328             text = "attrd";
 329             break;
 330         case crm_msg_stonithd:
 331             text = "stonithd";
 332             break;
 333         case crm_msg_stonith_ng:
 334             text = "stonith-ng";
 335             break;
 336     }
 337     return text;
 338 }
 339 
 340 /*!
 341  * \internal
 342  * \brief Check whether a Corosync CPG message is valid
 343  *
 344  * \param[in] msg   Corosync CPG message to check
 345  *
 346  * \return true if \p msg is valid, otherwise false
 347  */
 348 static bool
 349 check_message_sanity(const pcmk__cpg_msg_t *msg)
     /* [previous][next][first][last][top][bottom][index][help] */
 350 {
 351     int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
 352 
 353     if (payload_size < 1) {
 354         crm_err("%sCPG message %d from %s invalid: "
 355                 "Claimed size of %d bytes is too small "
 356                 CRM_XS " from %s[%u] to %s@%s",
 357                 (msg->is_compressed? "Compressed " : ""),
 358                 msg->id, ais_dest(&(msg->sender)),
 359                 (int) msg->header.size,
 360                 msg_type2text(msg->sender.type), msg->sender.pid,
 361                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 362         return false;
 363     }
 364 
 365     if (msg->header.error != CS_OK) {
 366         crm_err("%sCPG message %d from %s invalid: "
 367                 "Sender indicated error %d "
 368                 CRM_XS " from %s[%u] to %s@%s",
 369                 (msg->is_compressed? "Compressed " : ""),
 370                 msg->id, ais_dest(&(msg->sender)),
 371                 msg->header.error,
 372                 msg_type2text(msg->sender.type), msg->sender.pid,
 373                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 374         return false;
 375     }
 376 
 377     if (msg_data_len(msg) != payload_size) {
 378         crm_err("%sCPG message %d from %s invalid: "
 379                 "Total size %d inconsistent with payload size %d "
 380                 CRM_XS " from %s[%u] to %s@%s",
 381                 (msg->is_compressed? "Compressed " : ""),
 382                 msg->id, ais_dest(&(msg->sender)),
 383                 (int) msg->header.size, (int) msg_data_len(msg),
 384                 msg_type2text(msg->sender.type), msg->sender.pid,
 385                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 386         return false;
 387     }
 388 
 389     if (!msg->is_compressed &&
 390         /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
 391          * but checking the last byte or two should be quick
 392          */
 393         (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
 394          || (msg->data[msg->size - 1] != '\0'))) {
 395         crm_err("CPG message %d from %s invalid: "
 396                 "Payload does not end at byte %llu "
 397                 CRM_XS " from %s[%u] to %s@%s",
 398                 msg->id, ais_dest(&(msg->sender)),
 399                 (unsigned long long) msg->size,
 400                 msg_type2text(msg->sender.type), msg->sender.pid,
 401                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 402         return false;
 403     }
 404 
 405     crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
 406               (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
 407               msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
 408               ais_dest(&(msg->sender)),
 409               msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 410     return true;
 411 }
 412 
 413 /*!
 414  * \internal
 415  * \brief Extract text data from a Corosync CPG message
 416  *
 417  * \param[in]     handle     CPG connection (to get local node ID if not known)
 418  * \param[in]     sender_id  Corosync ID of node that sent message
 419  * \param[in]     pid        Process ID of message sender (for logging only)
 420  * \param[in,out] content    CPG message
 421  * \param[out]    kind       If not \c NULL, will be set to CPG header ID
 422  *                           (which should be an <tt>enum crm_ais_msg_class</tt>
 423  *                           value, currently always \c crm_class_cluster)
 424  * \param[out]    from       If not \c NULL, will be set to sender uname
 425  *                           (valid for the lifetime of \p content)
 426  *
 427  * \return Newly allocated string with message data
 428  *
 429  * \note The caller is responsible for freeing the return value using \c free().
 430  */
 431 char *
 432 pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
     /* [previous][next][first][last][top][bottom][index][help] */
 433                        void *content, uint32_t *kind, const char **from)
 434 {
 435     char *data = NULL;
 436     pcmk__cpg_msg_t *msg = content;
 437 
 438     if (handle != 0) {
 439         // Do filtering and field massaging
 440         uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
 441         const char *local_name = pcmk__cluster_local_node_name();
 442 
 443         if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) {
 444             crm_err("Nodeid mismatch from %" PRIu32 ".%" PRIu32
 445                     ": claimed nodeid=%" PRIu32,
 446                     sender_id, pid, msg->sender.id);
 447             return NULL;
 448         }
 449         if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
 450             crm_trace("Not for us: %" PRIu32" != %" PRIu32,
 451                       msg->host.id, local_nodeid);
 452             return NULL;
 453         }
 454         if ((msg->host.size > 0)
 455             && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
 456 
 457             crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
 458             return NULL;
 459         }
 460 
 461         msg->sender.id = sender_id;
 462         if (msg->sender.size == 0) {
 463             const crm_node_t *peer =
 464                 pcmk__get_node(sender_id, NULL, NULL,
 465                                pcmk__node_search_cluster_member);
 466 
 467             if (peer->uname == NULL) {
 468                 crm_err("No uname for peer with nodeid=%u", sender_id);
 469 
 470             } else {
 471                 crm_notice("Fixing uname for peer with nodeid=%u", sender_id);
 472                 msg->sender.size = strlen(peer->uname);
 473                 memset(msg->sender.uname, 0, MAX_NAME);
 474                 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
 475             }
 476         }
 477     }
 478 
 479     crm_trace("Got new%s message (size=%d, %d, %d)",
 480               msg->is_compressed ? " compressed" : "",
 481               msg_data_len(msg), msg->size, msg->compressed_size);
 482 
 483     if (kind != NULL) {
 484         *kind = msg->header.id;
 485     }
 486     if (from != NULL) {
 487         *from = msg->sender.uname;
 488     }
 489 
 490     if (msg->is_compressed && (msg->size > 0)) {
 491         int rc = BZ_OK;
 492         char *uncompressed = NULL;
 493         unsigned int new_size = msg->size + 1;
 494 
 495         if (!check_message_sanity(msg)) {
 496             goto badmsg;
 497         }
 498 
 499         crm_trace("Decompressing message data");
 500         uncompressed = pcmk__assert_alloc(1, new_size);
 501         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
 502                                         msg->compressed_size, 1, 0);
 503 
 504         rc = pcmk__bzlib2rc(rc);
 505 
 506         if (rc != pcmk_rc_ok) {
 507             crm_err("Decompression failed: %s " CRM_XS " rc=%d",
 508                     pcmk_rc_str(rc), rc);
 509             free(uncompressed);
 510             goto badmsg;
 511         }
 512 
 513         pcmk__assert(new_size == msg->size);
 514 
 515         data = uncompressed;
 516 
 517     } else if (!check_message_sanity(msg)) {
 518         goto badmsg;
 519 
 520     } else {
 521         data = strdup(msg->data);
 522     }
 523 
 524     // Is this necessary?
 525     pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
 526                    pcmk__node_search_cluster_member);
 527 
 528     crm_trace("Payload: %.200s", data);
 529     return data;
 530 
 531   badmsg:
 532     crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
 533             " min=%d, total=%d, size=%d, bz2_size=%d",
 534             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
 535             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
 536             msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
 537             msg->header.size, msg->size, msg->compressed_size);
 538 
 539     free(data);
 540     return NULL;
 541 }
 542 
 543 /*!
 544  * \internal
 545  * \brief Compare cpg_address objects by node ID
 546  *
 547  * \param[in] first   First cpg_address structure to compare
 548  * \param[in] second  Second cpg_address structure to compare
 549  *
 550  * \return Negative number if first's node ID is lower,
 551  *         positive number if first's node ID is greater,
 552  *         or 0 if both node IDs are equal
 553  */
 554 static int
 555 cmp_member_list_nodeid(const void *first, const void *second)
     /* [previous][next][first][last][top][bottom][index][help] */
 556 {
 557     const struct cpg_address *const a = *((const struct cpg_address **) first),
 558                              *const b = *((const struct cpg_address **) second);
 559     if (a->nodeid < b->nodeid) {
 560         return -1;
 561     } else if (a->nodeid > b->nodeid) {
 562         return 1;
 563     }
 564     /* don't bother with "reason" nor "pid" */
 565     return 0;
 566 }
 567 
 568 /*!
 569  * \internal
 570  * \brief Get a readable string equivalent of a cpg_reason_t value
 571  *
 572  * \param[in] reason  CPG reason value
 573  *
 574  * \return Readable string suitable for logging
 575  */
 576 static const char *
 577 cpgreason2str(cpg_reason_t reason)
     /* [previous][next][first][last][top][bottom][index][help] */
 578 {
 579     switch (reason) {
 580         case CPG_REASON_JOIN:       return " via cpg_join";
 581         case CPG_REASON_LEAVE:      return " via cpg_leave";
 582         case CPG_REASON_NODEDOWN:   return " via cluster exit";
 583         case CPG_REASON_NODEUP:     return " via cluster join";
 584         case CPG_REASON_PROCDOWN:   return " for unknown reason";
 585         default:                    break;
 586     }
 587     return "";
 588 }
 589 
 590 /*!
 591  * \internal
 592  * \brief Get a log-friendly node name
 593  *
 594  * \param[in] peer  Node to check
 595  *
 596  * \return Node's uname, or readable string if not known
 597  */
 598 static inline const char *
 599 peer_name(const crm_node_t *peer)
     /* [previous][next][first][last][top][bottom][index][help] */
 600 {
 601     if (peer == NULL) {
 602         return "unknown node";
 603     } else if (peer->uname == NULL) {
 604         return "peer node";
 605     } else {
 606         return peer->uname;
 607     }
 608 }
 609 
 610 /*!
 611  * \internal
 612  * \brief Process a CPG peer's leaving the cluster
 613  *
 614  * \param[in] cpg_group_name      CPG group name (for logging)
 615  * \param[in] event_counter       Event number (for logging)
 616  * \param[in] local_nodeid        Node ID of local node
 617  * \param[in] cpg_peer            CPG peer that left
 618  * \param[in] sorted_member_list  List of remaining members, qsort()-ed by ID
 619  * \param[in] member_list_entries Number of entries in \p sorted_member_list
 620  */
 621 static void
 622 node_left(const char *cpg_group_name, int event_counter,
     /* [previous][next][first][last][top][bottom][index][help] */
 623           uint32_t local_nodeid, const struct cpg_address *cpg_peer,
 624           const struct cpg_address **sorted_member_list,
 625           size_t member_list_entries)
 626 {
 627     crm_node_t *peer =
 628         pcmk__search_node_caches(cpg_peer->nodeid, NULL,
 629                                  pcmk__node_search_cluster_member);
 630     const struct cpg_address **rival = NULL;
 631 
 632     /* Most CPG-related Pacemaker code assumes that only one process on a node
 633      * can be in the process group, but Corosync does not impose this
 634      * limitation, and more than one can be a member in practice due to a
 635      * daemon attempting to start while another instance is already running.
 636      *
 637      * Check for any such duplicate instances, because we don't want to process
 638      * their leaving as if our actual peer left. If the peer that left still has
 639      * an entry in sorted_member_list (with a different PID), we will ignore the
 640      * leaving.
 641      *
 642      * @TODO Track CPG members' PIDs so we can tell exactly who left.
 643      */
 644     if (peer != NULL) {
 645         rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
 646                         sizeof(const struct cpg_address *),
 647                         cmp_member_list_nodeid);
 648     }
 649 
 650     if (rival == NULL) {
 651         crm_info("Group %s event %d: %s (node %u pid %u) left%s",
 652                  cpg_group_name, event_counter, peer_name(peer),
 653                  cpg_peer->nodeid, cpg_peer->pid,
 654                  cpgreason2str(cpg_peer->reason));
 655         if (peer != NULL) {
 656             crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 657                                  PCMK_VALUE_OFFLINE);
 658         }
 659     } else if (cpg_peer->nodeid == local_nodeid) {
 660         crm_warn("Group %s event %d: duplicate local pid %u left%s",
 661                  cpg_group_name, event_counter,
 662                  cpg_peer->pid, cpgreason2str(cpg_peer->reason));
 663     } else {
 664         crm_warn("Group %s event %d: "
 665                  "%s (node %u) duplicate pid %u left%s (%u remains)",
 666                  cpg_group_name, event_counter, peer_name(peer),
 667                  cpg_peer->nodeid, cpg_peer->pid,
 668                  cpgreason2str(cpg_peer->reason), (*rival)->pid);
 669     }
 670 }
 671 
 672 /*!
 673  * \internal
 674  * \brief Handle a CPG configuration change event
 675  *
 676  * \param[in] handle               CPG connection
 677  * \param[in] group_name           CPG group name
 678  * \param[in] member_list          List of current CPG members
 679  * \param[in] member_list_entries  Number of entries in \p member_list
 680  * \param[in] left_list            List of CPG members that left
 681  * \param[in] left_list_entries    Number of entries in \p left_list
 682  * \param[in] joined_list          List of CPG members that joined
 683  * \param[in] joined_list_entries  Number of entries in \p joined_list
 684  *
 685  * \note This is of type \c cpg_confchg_fn_t, intended to be used in a
 686  *       \c cpg_callbacks_t object.
 687  */
 688 void
 689 pcmk__cpg_confchg_cb(cpg_handle_t handle,
     /* [previous][next][first][last][top][bottom][index][help] */
 690                      const struct cpg_name *group_name,
 691                      const struct cpg_address *member_list,
 692                      size_t member_list_entries,
 693                      const struct cpg_address *left_list,
 694                      size_t left_list_entries,
 695                      const struct cpg_address *joined_list,
 696                      size_t joined_list_entries)
 697 {
 698     static int counter = 0;
 699 
 700     bool found = false;
 701     uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
 702     const struct cpg_address **sorted = NULL;
 703 
 704     sorted = pcmk__assert_alloc(member_list_entries,
 705                                 sizeof(const struct cpg_address *));
 706 
 707     for (size_t iter = 0; iter < member_list_entries; iter++) {
 708         sorted[iter] = member_list + iter;
 709     }
 710 
 711     // So that the cross-matching of multiply-subscribed nodes is then cheap
 712     qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
 713           cmp_member_list_nodeid);
 714 
 715     for (int i = 0; i < left_list_entries; i++) {
 716         node_left(group_name->value, counter, local_nodeid, &left_list[i],
 717                   sorted, member_list_entries);
 718     }
 719     free(sorted);
 720     sorted = NULL;
 721 
 722     for (int i = 0; i < joined_list_entries; i++) {
 723         crm_info("Group %s event %d: node %u pid %u joined%s",
 724                  group_name->value, counter, joined_list[i].nodeid,
 725                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
 726     }
 727 
 728     for (int i = 0; i < member_list_entries; i++) {
 729         crm_node_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL,
 730                                           pcmk__node_search_cluster_member);
 731 
 732         if (member_list[i].nodeid == local_nodeid
 733                 && member_list[i].pid != getpid()) {
 734             // See the note in node_left()
 735             crm_warn("Group %s event %d: detected duplicate local pid %u",
 736                      group_name->value, counter, member_list[i].pid);
 737             continue;
 738         }
 739         crm_info("Group %s event %d: %s (node %u pid %u) is member",
 740                  group_name->value, counter, peer_name(peer),
 741                  member_list[i].nodeid, member_list[i].pid);
 742 
 743         /* If the caller left auto-reaping enabled, this will also update the
 744          * state to member.
 745          */
 746         peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 747                                     PCMK_VALUE_ONLINE);
 748 
 749         if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
 750             /* The node is a CPG member, but we currently think it's not a
 751              * cluster member. This is possible only if auto-reaping was
 752              * disabled. The node may be joining, and we happened to get the CPG
 753              * notification before the quorum notification; or the node may have
 754              * just died, and we are processing its final messages; or a bug
 755              * has affected the peer cache.
 756              */
 757             time_t now = time(NULL);
 758 
 759             if (peer->when_lost == 0) {
 760                 // Track when we first got into this contradictory state
 761                 peer->when_lost = now;
 762 
 763             } else if (now > (peer->when_lost + 60)) {
 764                 // If it persists for more than a minute, update the state
 765                 crm_warn("Node %u is member of group %s but was believed "
 766                          "offline",
 767                          member_list[i].nodeid, group_name->value);
 768                 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
 769             }
 770         }
 771 
 772         if (local_nodeid == member_list[i].nodeid) {
 773             found = true;
 774         }
 775     }
 776 
 777     if (!found) {
 778         crm_err("Local node was evicted from group %s", group_name->value);
 779         cpg_evicted = true;
 780     }
 781 
 782     counter++;
 783 }
 784 
 785 /*!
 786  * \brief Set the CPG deliver callback function for a cluster object
 787  *
 788  * \param[in,out] cluster  Cluster object
 789  * \param[in]     fn       Deliver callback function to set
 790  *
 791  * \return Standard Pacemaker return code
 792  */
 793 int
 794 pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
     /* [previous][next][first][last][top][bottom][index][help] */
 795 {
 796     if (cluster == NULL) {
 797         return EINVAL;
 798     }
 799     cluster->cpg.cpg_deliver_fn = fn;
 800     return pcmk_rc_ok;
 801 }
 802 
 803 /*!
 804  * \brief Set the CPG config change callback function for a cluster object
 805  *
 806  * \param[in,out] cluster  Cluster object
 807  * \param[in]     fn       Configuration change callback function to set
 808  *
 809  * \return Standard Pacemaker return code
 810  */
 811 int
 812 pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
     /* [previous][next][first][last][top][bottom][index][help] */
 813 {
 814     if (cluster == NULL) {
 815         return EINVAL;
 816     }
 817     cluster->cpg.cpg_confchg_fn = fn;
 818     return pcmk_rc_ok;
 819 }
 820 
 821 /*!
 822  * \brief Connect to Corosync CPG
 823  *
 824  * \param[in,out] cluster  Initialized cluster object to connect
 825  *
 826  * \return Standard Pacemaker return code
 827  */
 828 int
 829 pcmk__cpg_connect(pcmk_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
 830 {
 831     cs_error_t rc;
 832     int fd = -1;
 833     int retries = 0;
 834     uint32_t id = 0;
 835     crm_node_t *peer = NULL;
 836     cpg_handle_t handle = 0;
 837     const char *message_name = pcmk__message_name(crm_system_name);
 838     uid_t found_uid = 0;
 839     gid_t found_gid = 0;
 840     pid_t found_pid = 0;
 841     int rv;
 842 
 843     struct mainloop_fd_callbacks cpg_fd_callbacks = {
 844         .dispatch = pcmk_cpg_dispatch,
 845         .destroy = cluster->destroy,
 846     };
 847 
 848     cpg_model_v1_data_t cpg_model_info = {
 849             .model = CPG_MODEL_V1,
 850             .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
 851             .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
 852             .cpg_totem_confchg_fn = NULL,
 853             .flags = 0,
 854     };
 855 
 856     cpg_evicted = false;
 857     cluster->group.length = 0;
 858     cluster->group.value[0] = 0;
 859 
 860     /* group.value is char[128] */
 861     strncpy(cluster->group.value, message_name, 127);
 862     cluster->group.value[127] = 0;
 863     cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
 864 
 865     cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
 866     if (rc != CS_OK) {
 867         crm_err("Could not connect to the CPG API: %s (%d)",
 868                 cs_strerror(rc), rc);
 869         goto bail;
 870     }
 871 
 872     rc = cpg_fd_get(handle, &fd);
 873     if (rc != CS_OK) {
 874         crm_err("Could not obtain the CPG API connection: %s (%d)",
 875                 cs_strerror(rc), rc);
 876         goto bail;
 877     }
 878 
 879     /* CPG provider run as root (in given user namespace, anyway)? */
 880     if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 881                                             &found_uid, &found_gid))) {
 882         crm_err("CPG provider is not authentic:"
 883                 " process %lld (uid: %lld, gid: %lld)",
 884                 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 885                 (long long) found_uid, (long long) found_gid);
 886         rc = CS_ERR_ACCESS;
 887         goto bail;
 888     } else if (rv < 0) {
 889         crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 890                 strerror(-rv), -rv);
 891         rc = CS_ERR_ACCESS;
 892         goto bail;
 893     }
 894 
 895     id = pcmk__cpg_local_nodeid(handle);
 896     if (id == 0) {
 897         crm_err("Could not get local node id from the CPG API");
 898         goto bail;
 899 
 900     }
 901     cluster->nodeid = id;
 902 
 903     retries = 0;
 904     cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
 905     if (rc != CS_OK) {
 906         crm_err("Could not join the CPG group '%s': %d", message_name, rc);
 907         goto bail;
 908     }
 909 
 910     pcmk_cpg_handle = handle;
 911     cluster->cpg_handle = handle;
 912     mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
 913 
 914   bail:
 915     if (rc != CS_OK) {
 916         cpg_finalize(handle);
 917         // @TODO Map rc to more specific Pacemaker return code
 918         return ENOTCONN;
 919     }
 920 
 921     peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
 922     crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
 923     return pcmk_rc_ok;
 924 }
 925 
 926 /*!
 927  * \internal
 928  * \brief Disconnect from Corosync CPG
 929  *
 930  * \param[in,out] cluster  Cluster object to disconnect
 931  */
 932 void
 933 pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
 934 {
 935     pcmk_cpg_handle = 0;
 936     if (cluster->cpg_handle != 0) {
 937         crm_trace("Disconnecting CPG");
 938         cpg_leave(cluster->cpg_handle, &cluster->group);
 939         cpg_finalize(cluster->cpg_handle);
 940         cluster->cpg_handle = 0;
 941 
 942     } else {
 943         crm_info("No CPG connection");
 944     }
 945 }
 946 
 947 /*!
 948  * \internal
 949  * \brief Send string data via Corosync CPG
 950  *
 951  * \param[in] data   Data to send
 952  * \param[in] local  What to set as host "local" value (which is never used)
 953  * \param[in] node   Cluster node to send message to
 954  * \param[in] dest   Type of message to send
 955  *
 956  * \return \c true on success, or \c false otherwise
 957  */
 958 static bool
 959 send_cpg_text(const char *data, bool local, const crm_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
 960               enum crm_ais_msg_types dest)
 961 {
 962     // @COMPAT Drop local argument when send_cluster_text is dropped
 963     static int msg_id = 0;
 964     static int local_pid = 0;
 965     static int local_name_len = 0;
 966     static const char *local_name = NULL;
 967 
 968     char *target = NULL;
 969     struct iovec *iov;
 970     pcmk__cpg_msg_t *msg = NULL;
 971 
 972     CRM_CHECK(dest != crm_msg_ais, return false);
 973 
 974     if (local_name == NULL) {
 975         local_name = pcmk__cluster_local_node_name();
 976     }
 977     if ((local_name_len == 0) && (local_name != NULL)) {
 978         local_name_len = strlen(local_name);
 979     }
 980 
 981     if (data == NULL) {
 982         data = "";
 983     }
 984 
 985     if (local_pid == 0) {
 986         local_pid = getpid();
 987     }
 988 
 989     msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
 990 
 991     msg_id++;
 992     msg->id = msg_id;
 993     msg->header.id = crm_class_cluster;
 994     msg->header.error = CS_OK;
 995 
 996     msg->host.type = dest;
 997     msg->host.local = local;
 998 
 999     if (node != NULL) {
1000         if (node->uname != NULL) {
1001             target = pcmk__str_copy(node->uname);
1002             msg->host.size = strlen(node->uname);
1003             memset(msg->host.uname, 0, MAX_NAME);
1004             memcpy(msg->host.uname, node->uname, msg->host.size);
1005 
1006         } else {
1007             target = crm_strdup_printf("%u", node->id);
1008         }
1009         msg->host.id = node->id;
1010 
1011     } else {
1012         target = pcmk__str_copy("all");
1013     }
1014 
1015     msg->sender.id = 0;
1016     msg->sender.type = pcmk__cluster_parse_msg_type(crm_system_name);
1017     msg->sender.pid = local_pid;
1018     msg->sender.size = local_name_len;
1019     memset(msg->sender.uname, 0, MAX_NAME);
1020 
1021     if ((local_name != NULL) && (msg->sender.size != 0)) {
1022         memcpy(msg->sender.uname, local_name, msg->sender.size);
1023     }
1024 
1025     msg->size = 1 + strlen(data);
1026     msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
1027 
1028     if (msg->size < CRM_BZ2_THRESHOLD) {
1029         msg = pcmk__realloc(msg, msg->header.size);
1030         memcpy(msg->data, data, msg->size);
1031 
1032     } else {
1033         char *compressed = NULL;
1034         unsigned int new_size = 0;
1035 
1036         if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
1037                            &new_size) == pcmk_rc_ok) {
1038 
1039             msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
1040             msg = pcmk__realloc(msg, msg->header.size);
1041             memcpy(msg->data, compressed, new_size);
1042 
1043             msg->is_compressed = TRUE;
1044             msg->compressed_size = new_size;
1045 
1046         } else {
1047             // cppcheck seems not to understand the abort logic in pcmk__realloc
1048             // cppcheck-suppress memleak
1049             msg = pcmk__realloc(msg, msg->header.size);
1050             memcpy(msg->data, data, msg->size);
1051         }
1052 
1053         free(compressed);
1054     }
1055 
1056     iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1057     iov->iov_base = msg;
1058     iov->iov_len = msg->header.size;
1059 
1060     if (msg->compressed_size > 0) {
1061         crm_trace("Queueing CPG message %u to %s "
1062                   "(%llu bytes, %d bytes compressed payload): %.200s",
1063                   msg->id, target, (unsigned long long) iov->iov_len,
1064                   msg->compressed_size, data);
1065     } else {
1066         crm_trace("Queueing CPG message %u to %s "
1067                   "(%llu bytes, %d bytes payload): %.200s",
1068                   msg->id, target, (unsigned long long) iov->iov_len,
1069                   msg->size, data);
1070     }
1071 
1072     free(target);
1073 
1074     cs_message_queue = g_list_append(cs_message_queue, iov);
1075     crm_cs_flush(&pcmk_cpg_handle);
1076 
1077     return true;
1078 }
1079 
1080 /*!
1081  * \internal
1082  * \brief Send an XML message via Corosync CPG
1083  *
1084  * \param[in] msg   XML message to send
1085  * \param[in] node  Cluster node to send message to
1086  * \param[in] dest  Type of message to send
1087  *
1088  * \return TRUE on success, otherwise FALSE
1089  */
1090 bool
1091 pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1092                    enum crm_ais_msg_types dest)
1093 {
1094     bool rc = true;
1095     GString *data = g_string_sized_new(1024);
1096 
1097     pcmk__xml_string(msg, 0, data, 0);
1098 
1099     rc = send_cpg_text(data->str, false, node, dest);
1100     g_string_free(data, TRUE);
1101     return rc;
1102 }
1103 
1104 // Deprecated functions kept only for backward API compatibility
1105 // LCOV_EXCL_START
1106 
1107 #include <crm/cluster/compat.h>
1108 
1109 gboolean
1110 cluster_connect_cpg(pcmk_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
1111 {
1112     return pcmk__cpg_connect(cluster) == pcmk_rc_ok;
1113 }
1114 
1115 void
1116 cluster_disconnect_cpg(pcmk_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
1117 {
1118     pcmk__cpg_disconnect(cluster);
1119 }
1120 
1121 uint32_t
1122 get_local_nodeid(cpg_handle_t handle)
     /* [previous][next][first][last][top][bottom][index][help] */
1123 {
1124     return pcmk__cpg_local_nodeid(handle);
1125 }
1126 
1127 void
1128 pcmk_cpg_membership(cpg_handle_t handle,
     /* [previous][next][first][last][top][bottom][index][help] */
1129                     const struct cpg_name *group_name,
1130                     const struct cpg_address *member_list,
1131                     size_t member_list_entries,
1132                     const struct cpg_address *left_list,
1133                     size_t left_list_entries,
1134                     const struct cpg_address *joined_list,
1135                     size_t joined_list_entries)
1136 {
1137     pcmk__cpg_confchg_cb(handle, group_name, member_list, member_list_entries,
1138                          left_list, left_list_entries,
1139                          joined_list, joined_list_entries);
1140 }
1141 
1142 gboolean
1143 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
     /* [previous][next][first][last][top][bottom][index][help] */
1144                   gboolean local, const crm_node_t *node,
1145                   enum crm_ais_msg_types dest)
1146 {
1147     switch (msg_class) {
1148         case crm_class_cluster:
1149             return send_cpg_text(data, local, node, dest);
1150         default:
1151             crm_err("Invalid message class: %d", msg_class);
1152             return FALSE;
1153     }
1154 }
1155 
1156 char *
1157 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid,
     /* [previous][next][first][last][top][bottom][index][help] */
1158                        void *content, uint32_t *kind, const char **from)
1159 {
1160     return pcmk__cpg_message_data(handle, nodeid, pid, content, kind, from);
1161 }
1162 
1163 enum crm_ais_msg_types
1164 text2msg_type(const char *text)
     /* [previous][next][first][last][top][bottom][index][help] */
1165 {
1166     int type = crm_msg_none;
1167 
1168     CRM_CHECK(text != NULL, return type);
1169     text = pcmk__message_name(text);
1170     if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1171         type = crm_msg_ais;
1172     } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1173         type = crm_msg_cib;
1174     } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1175         type = crm_msg_crmd;
1176     } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1177         type = crm_msg_te;
1178     } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1179         type = crm_msg_pe;
1180     } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1181         type = crm_msg_lrmd;
1182     } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1183         type = crm_msg_stonithd;
1184     } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1185         type = crm_msg_stonith_ng;
1186     } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1187         type = crm_msg_attrd;
1188 
1189     } else {
1190         /* This will normally be a transient client rather than
1191          * a cluster daemon.  Set the type to the pid of the client
1192          */
1193         int scan_rc = sscanf(text, "%d", &type);
1194 
1195         if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1196             /* Ensure it's sane */
1197             type = crm_msg_none;
1198         }
1199     }
1200     return type;
1201 }
1202 
1203 // LCOV_EXCL_STOP
1204 // End deprecated API

/* [previous][next][first][last][top][bottom][index][help] */