root/lib/cluster/cpg.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. cluster_disconnect_cpg
  2. get_local_nodeid
  3. crm_cs_flush_cb
  4. crm_cs_flush
  5. pcmk_cpg_dispatch
  6. ais_dest
  7. msg_type2text
  8. check_message_sanity
  9. pcmk_message_common_cs
  10. cmp_member_list_nodeid
  11. cpgreason2str
  12. peer_name
  13. node_left
  14. pcmk_cpg_membership
  15. cluster_connect_cpg
  16. pcmk__cpg_send_xml
  17. send_cluster_text
  18. text2msg_type

   1 /*
   2  * Copyright 2004-2021 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 #include <bzlib.h>
  12 #include <sys/socket.h>
  13 #include <netinet/in.h>
  14 #include <arpa/inet.h>
  15 #include <netdb.h>
  16 
  17 #include <crm/common/ipc.h>
  18 #include <crm/cluster/internal.h>
  19 #include <crm/common/mainloop.h>
  20 #include <sys/utsname.h>
  21 
  22 #include <qb/qbipc_common.h>
  23 #include <qb/qbipcc.h>
  24 #include <qb/qbutil.h>
  25 
  26 #include <corosync/corodefs.h>
  27 #include <corosync/corotypes.h>
  28 #include <corosync/hdb.h>
  29 #include <corosync/cpg.h>
  30 
  31 #include <crm/msg_xml.h>
  32 
  33 #include <crm/common/ipc_internal.h>  /* PCMK__SPECIAL_PID* */
  34 #include "crmcluster_private.h"
  35 
  36 /* @TODO Once we can update the public API to require crm_cluster_t* in more
  37  *       functions, we can ditch this in favor of cluster->cpg_handle.
  38  */
  39 static cpg_handle_t pcmk_cpg_handle = 0;
  40 
  41 // @TODO These could be moved to crm_cluster_t* at that time as well
  42 static bool cpg_evicted = false;
  43 static GList *cs_message_queue = NULL;
  44 static int cs_message_timer = 0;
  45 
  46 struct pcmk__cpg_host_s {
  47     uint32_t id;
  48     uint32_t pid;
  49     gboolean local;
  50     enum crm_ais_msg_types type;
  51     uint32_t size;
  52     char uname[MAX_NAME];
  53 } __attribute__ ((packed));
  54 
  55 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
  56 
  57 struct pcmk__cpg_msg_s {
  58     struct qb_ipc_response_header header __attribute__ ((aligned(8)));
  59     uint32_t id;
  60     gboolean is_compressed;
  61 
  62     pcmk__cpg_host_t host;
  63     pcmk__cpg_host_t sender;
  64 
  65     uint32_t size;
  66     uint32_t compressed_size;
  67     /* 584 bytes */
  68     char data[0];
  69 
  70 } __attribute__ ((packed));
  71 
  72 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
  73 
  74 static void crm_cs_flush(gpointer data);
  75 
  76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
  77 
  78 #define cs_repeat(rc, counter, max, code) do {                          \
  79         rc = code;                                                      \
  80         if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) {    \
  81             counter++;                                                  \
  82             crm_debug("Retrying operation after %ds", counter);         \
  83             sleep(counter);                                             \
  84         } else {                                                        \
  85             break;                                                      \
  86         }                                                               \
  87     } while (counter < max)
  88 
  89 /*!
  90  * \brief Disconnect from Corosync CPG
  91  *
  92  * \param[in] Cluster to disconnect
  93  */
  94 void
  95 cluster_disconnect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
  96 {
  97     pcmk_cpg_handle = 0;
  98     if (cluster->cpg_handle) {
  99         crm_trace("Disconnecting CPG");
 100         cpg_leave(cluster->cpg_handle, &cluster->group);
 101         cpg_finalize(cluster->cpg_handle);
 102         cluster->cpg_handle = 0;
 103 
 104     } else {
 105         crm_info("No CPG connection");
 106     }
 107 }
 108 
 109 /*!
 110  * \brief Get the local Corosync node ID (via CPG)
 111  *
 112  * \param[in] handle  CPG connection to use (or 0 to use new connection)
 113  *
 114  * \return Corosync ID of local node (or 0 if not known)
 115  */
 116 uint32_t
 117 get_local_nodeid(cpg_handle_t handle)
     /* [previous][next][first][last][top][bottom][index][help] */
 118 {
 119     cs_error_t rc = CS_OK;
 120     int retries = 0;
 121     static uint32_t local_nodeid = 0;
 122     cpg_handle_t local_handle = handle;
 123     cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
 124     int fd = -1;
 125     uid_t found_uid = 0;
 126     gid_t found_gid = 0;
 127     pid_t found_pid = 0;
 128     int rv;
 129 
 130     if(local_nodeid != 0) {
 131         return local_nodeid;
 132     }
 133 
 134     if(handle == 0) {
 135         crm_trace("Creating connection");
 136         cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
 137         if (rc != CS_OK) {
 138             crm_err("Could not connect to the CPG API: %s (%d)",
 139                     cs_strerror(rc), rc);
 140             return 0;
 141         }
 142 
 143         rc = cpg_fd_get(local_handle, &fd);
 144         if (rc != CS_OK) {
 145             crm_err("Could not obtain the CPG API connection: %s (%d)",
 146                     cs_strerror(rc), rc);
 147             goto bail;
 148         }
 149 
 150         /* CPG provider run as root (in given user namespace, anyway)? */
 151         if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 152                                                 &found_uid, &found_gid))) {
 153             crm_err("CPG provider is not authentic:"
 154                     " process %lld (uid: %lld, gid: %lld)",
 155                     (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 156                     (long long) found_uid, (long long) found_gid);
 157             goto bail;
 158         } else if (rv < 0) {
 159             crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 160                     strerror(-rv), -rv);
 161             goto bail;
 162         }
 163     }
 164 
 165     if (rc == CS_OK) {
 166         retries = 0;
 167         crm_trace("Performing lookup");
 168         cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
 169     }
 170 
 171     if (rc != CS_OK) {
 172         crm_err("Could not get local node id from the CPG API: %s (%d)",
 173                 pcmk__cs_err_str(rc), rc);
 174     }
 175 
 176 bail:
 177     if(handle == 0) {
 178         crm_trace("Closing connection");
 179         cpg_finalize(local_handle);
 180     }
 181     crm_debug("Local nodeid is %u", local_nodeid);
 182     return local_nodeid;
 183 }
 184 
 185 /*!
 186  * \internal
 187  * \brief Callback function for Corosync message queue timer
 188  *
 189  * \param[in] data  CPG handle
 190  *
 191  * \return FALSE (to indicate to glib that timer should not be removed)
 192  */
 193 static gboolean
 194 crm_cs_flush_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 195 {
 196     cs_message_timer = 0;
 197     crm_cs_flush(data);
 198     return FALSE;
 199 }
 200 
 201 // Send no more than this many CPG messages in one flush
 202 #define CS_SEND_MAX 200
 203 
 204 /*!
 205  * \internal
 206  * \brief Send messages in Corosync CPG message queue
 207  *
 208  * \param[in] data   CPG handle
 209  */
 210 static void
 211 crm_cs_flush(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 212 {
 213     unsigned int sent = 0;
 214     guint queue_len = 0;
 215     cs_error_t rc = 0;
 216     cpg_handle_t *handle = (cpg_handle_t *) data;
 217 
 218     if (*handle == 0) {
 219         crm_trace("Connection is dead");
 220         return;
 221     }
 222 
 223     queue_len = g_list_length(cs_message_queue);
 224     if (((queue_len % 1000) == 0) && (queue_len > 1)) {
 225         crm_err("CPG queue has grown to %d", queue_len);
 226 
 227     } else if (queue_len == CS_SEND_MAX) {
 228         crm_warn("CPG queue has grown to %d", queue_len);
 229     }
 230 
 231     if (cs_message_timer != 0) {
 232         /* There is already a timer, wait until it goes off */
 233         crm_trace("Timer active %d", cs_message_timer);
 234         return;
 235     }
 236 
 237     while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
 238         struct iovec *iov = cs_message_queue->data;
 239 
 240         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
 241         if (rc != CS_OK) {
 242             break;
 243         }
 244 
 245         sent++;
 246         crm_trace("CPG message sent, size=%llu",
 247                   (unsigned long long) iov->iov_len);
 248 
 249         cs_message_queue = g_list_remove(cs_message_queue, iov);
 250         free(iov->iov_base);
 251         free(iov);
 252     }
 253 
 254     queue_len -= sent;
 255     do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
 256                "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
 257                sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
 258                (int) rc);
 259 
 260     if (cs_message_queue) {
 261         uint32_t delay_ms = 100;
 262         if (rc != CS_OK) {
 263             /* Proportionally more if sending failed but cap at 1s */
 264             delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
 265         }
 266         cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
 267     }
 268 }
 269 
 270 /*!
 271  * \internal
 272  * \brief Dispatch function for CPG handle
 273  *
 274  * \param[in] user_data  Cluster object
 275  *
 276  * \return 0 on success, -1 on error (per mainloop_io_t interface)
 277  */
 278 static int
 279 pcmk_cpg_dispatch(gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
 280 {
 281     cs_error_t rc = CS_OK;
 282     crm_cluster_t *cluster = (crm_cluster_t *) user_data;
 283 
 284     rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
 285     if (rc != CS_OK) {
 286         crm_err("Connection to the CPG API failed: %s (%d)",
 287                 pcmk__cs_err_str(rc), rc);
 288         cpg_finalize(cluster->cpg_handle);
 289         cluster->cpg_handle = 0;
 290         return -1;
 291 
 292     } else if (cpg_evicted) {
 293         crm_err("Evicted from CPG membership");
 294         return -1;
 295     }
 296     return 0;
 297 }
 298 
 299 static inline const char *
 300 ais_dest(const pcmk__cpg_host_t *host)
     /* [previous][next][first][last][top][bottom][index][help] */
 301 {
 302     if (host->local) {
 303         return "local";
 304     } else if (host->size > 0) {
 305         return host->uname;
 306     } else {
 307         return "<all>";
 308     }
 309 }
 310 
 311 static inline const char *
 312 msg_type2text(enum crm_ais_msg_types type)
     /* [previous][next][first][last][top][bottom][index][help] */
 313 {
 314     const char *text = "unknown";
 315 
 316     switch (type) {
 317         case crm_msg_none:
 318             text = "unknown";
 319             break;
 320         case crm_msg_ais:
 321             text = "ais";
 322             break;
 323         case crm_msg_cib:
 324             text = "cib";
 325             break;
 326         case crm_msg_crmd:
 327             text = "crmd";
 328             break;
 329         case crm_msg_pe:
 330             text = "pengine";
 331             break;
 332         case crm_msg_te:
 333             text = "tengine";
 334             break;
 335         case crm_msg_lrmd:
 336             text = "lrmd";
 337             break;
 338         case crm_msg_attrd:
 339             text = "attrd";
 340             break;
 341         case crm_msg_stonithd:
 342             text = "stonithd";
 343             break;
 344         case crm_msg_stonith_ng:
 345             text = "stonith-ng";
 346             break;
 347     }
 348     return text;
 349 }
 350 
 351 /*!
 352  * \internal
 353  * \brief Check whether a Corosync CPG message is valid
 354  *
 355  * \param[in] msg   Corosync CPG message to check
 356  *
 357  * \return true if \p msg is valid, otherwise false
 358  */
 359 static bool
 360 check_message_sanity(const pcmk__cpg_msg_t *msg)
     /* [previous][next][first][last][top][bottom][index][help] */
 361 {
 362     int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
 363 
 364     if (payload_size < 1) {
 365         crm_err("%sCPG message %d from %s invalid: "
 366                 "Claimed size of %d bytes is too small "
 367                 CRM_XS " from %s[%u] to %s@%s",
 368                 (msg->is_compressed? "Compressed " : ""),
 369                 msg->id, ais_dest(&(msg->sender)),
 370                 (int) msg->header.size,
 371                 msg_type2text(msg->sender.type), msg->sender.pid,
 372                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 373         return false;
 374     }
 375 
 376     if (msg->header.error != CS_OK) {
 377         crm_err("%sCPG message %d from %s invalid: "
 378                 "Sender indicated error %d "
 379                 CRM_XS " from %s[%u] to %s@%s",
 380                 (msg->is_compressed? "Compressed " : ""),
 381                 msg->id, ais_dest(&(msg->sender)),
 382                 msg->header.error,
 383                 msg_type2text(msg->sender.type), msg->sender.pid,
 384                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 385         return false;
 386     }
 387 
 388     if (msg_data_len(msg) != payload_size) {
 389         crm_err("%sCPG message %d from %s invalid: "
 390                 "Total size %d inconsistent with payload size %d "
 391                 CRM_XS " from %s[%u] to %s@%s",
 392                 (msg->is_compressed? "Compressed " : ""),
 393                 msg->id, ais_dest(&(msg->sender)),
 394                 (int) msg->header.size, (int) msg_data_len(msg),
 395                 msg_type2text(msg->sender.type), msg->sender.pid,
 396                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 397         return false;
 398     }
 399 
 400     if (!msg->is_compressed &&
 401         /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
 402          * but checking the last byte or two should be quick
 403          */
 404         (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
 405          || (msg->data[msg->size - 1] != '\0'))) {
 406         crm_err("CPG message %d from %s invalid: "
 407                 "Payload does not end at byte %llu "
 408                 CRM_XS " from %s[%u] to %s@%s",
 409                 msg->id, ais_dest(&(msg->sender)),
 410                 (unsigned long long) msg->size,
 411                 msg_type2text(msg->sender.type), msg->sender.pid,
 412                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 413         return false;
 414     }
 415 
 416     crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
 417               (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
 418               msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
 419               ais_dest(&(msg->sender)),
 420               msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 421     return true;
 422 }
 423 
 424 /*!
 425  * \brief Extract text data from a Corosync CPG message
 426  *
 427  * \param[in]  handle   CPG connection (to get local node ID if not yet known)
 428  * \param[in]  nodeid   Corosync ID of node that sent message
 429  * \param[in]  pid      Process ID of message sender (for logging only)
 430  * \param[in]  content  CPG message
 431  * \param[out] kind     If not NULL, will be set to CPG header ID
 432  *                      (which should be an enum crm_ais_msg_class value,
 433  *                      currently always crm_class_cluster)
 434  * \param[out] from     If not NULL, will be set to sender uname
 435  *                      (valid for the lifetime of \p content)
 436  *
 437  * \return Newly allocated string with message data
 438  * \note It is the caller's responsibility to free the return value with free().
 439  */
 440 char *
 441 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
     /* [previous][next][first][last][top][bottom][index][help] */
 442                         uint32_t *kind, const char **from)
 443 {
 444     char *data = NULL;
 445     pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
 446 
 447     if(handle) {
 448         // Do filtering and field massaging
 449         uint32_t local_nodeid = get_local_nodeid(handle);
 450         const char *local_name = get_local_node_name();
 451 
 452         if (msg->sender.id > 0 && msg->sender.id != nodeid) {
 453             crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
 454             return NULL;
 455 
 456         } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
 457             /* Not for us */
 458             crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
 459             return NULL;
 460         } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
 461             /* Not for us */
 462             crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
 463             return NULL;
 464         }
 465 
 466         msg->sender.id = nodeid;
 467         if (msg->sender.size == 0) {
 468             crm_node_t *peer = crm_get_peer(nodeid, NULL);
 469 
 470             if (peer == NULL) {
 471                 crm_err("Peer with nodeid=%u is unknown", nodeid);
 472 
 473             } else if (peer->uname == NULL) {
 474                 crm_err("No uname for peer with nodeid=%u", nodeid);
 475 
 476             } else {
 477                 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
 478                 msg->sender.size = strlen(peer->uname);
 479                 memset(msg->sender.uname, 0, MAX_NAME);
 480                 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
 481             }
 482         }
 483     }
 484 
 485     crm_trace("Got new%s message (size=%d, %d, %d)",
 486               msg->is_compressed ? " compressed" : "",
 487               msg_data_len(msg), msg->size, msg->compressed_size);
 488 
 489     if (kind != NULL) {
 490         *kind = msg->header.id;
 491     }
 492     if (from != NULL) {
 493         *from = msg->sender.uname;
 494     }
 495 
 496     if (msg->is_compressed && msg->size > 0) {
 497         int rc = BZ_OK;
 498         char *uncompressed = NULL;
 499         unsigned int new_size = msg->size + 1;
 500 
 501         if (!check_message_sanity(msg)) {
 502             goto badmsg;
 503         }
 504 
 505         crm_trace("Decompressing message data");
 506         uncompressed = calloc(1, new_size);
 507         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
 508 
 509         if (rc != BZ_OK) {
 510             crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
 511                     bz2_strerror(rc), rc);
 512             free(uncompressed);
 513             goto badmsg;
 514         }
 515 
 516         CRM_ASSERT(rc == BZ_OK);
 517         CRM_ASSERT(new_size == msg->size);
 518 
 519         data = uncompressed;
 520 
 521     } else if (!check_message_sanity(msg)) {
 522         goto badmsg;
 523 
 524     } else {
 525         data = strdup(msg->data);
 526     }
 527 
 528     // Is this necessary?
 529     crm_get_peer(msg->sender.id, msg->sender.uname);
 530 
 531     crm_trace("Payload: %.200s", data);
 532     return data;
 533 
 534   badmsg:
 535     crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
 536             " min=%d, total=%d, size=%d, bz2_size=%d",
 537             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
 538             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
 539             msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
 540             msg->header.size, msg->size, msg->compressed_size);
 541 
 542     free(data);
 543     return NULL;
 544 }
 545 
 546 /*!
 547  * \internal
 548  * \brief Compare cpg_address objects by node ID
 549  *
 550  * \param[in] first   First cpg_address structure to compare
 551  * \param[in] second  Second cpg_address structure to compare
 552  *
 553  * \return Negative number if first's node ID is lower,
 554  *         positive number if first's node ID is greater,
 555  *         or 0 if both node IDs are equal
 556  */
 557 static int
 558 cmp_member_list_nodeid(const void *first, const void *second)
     /* [previous][next][first][last][top][bottom][index][help] */
 559 {
 560     const struct cpg_address *const a = *((const struct cpg_address **) first),
 561                              *const b = *((const struct cpg_address **) second);
 562     if (a->nodeid < b->nodeid) {
 563         return -1;
 564     } else if (a->nodeid > b->nodeid) {
 565         return 1;
 566     }
 567     /* don't bother with "reason" nor "pid" */
 568     return 0;
 569 }
 570 
 571 /*!
 572  * \internal
 573  * \brief Get a readable string equivalent of a cpg_reason_t value
 574  *
 575  * \param[in] reason  CPG reason value
 576  *
 577  * \return Readable string suitable for logging
 578  */
 579 static const char *
 580 cpgreason2str(cpg_reason_t reason)
     /* [previous][next][first][last][top][bottom][index][help] */
 581 {
 582     switch (reason) {
 583         case CPG_REASON_JOIN:       return " via cpg_join";
 584         case CPG_REASON_LEAVE:      return " via cpg_leave";
 585         case CPG_REASON_NODEDOWN:   return " via cluster exit";
 586         case CPG_REASON_NODEUP:     return " via cluster join";
 587         case CPG_REASON_PROCDOWN:   return " for unknown reason";
 588         default:                    break;
 589     }
 590     return "";
 591 }
 592 
 593 /*!
 594  * \internal
 595  * \brief Get a log-friendly node name
 596  *
 597  * \param[in] peer  Node to check
 598  *
 599  * \return Node's uname, or readable string if not known
 600  */
 601 static inline const char *
 602 peer_name(crm_node_t *peer)
     /* [previous][next][first][last][top][bottom][index][help] */
 603 {
 604     if (peer == NULL) {
 605         return "unknown node";
 606     } else if (peer->uname == NULL) {
 607         return "peer node";
 608     } else {
 609         return peer->uname;
 610     }
 611 }
 612 
 613 /*!
 614  * \internal
 615  * \brief Process a CPG peer's leaving the cluster
 616  *
 617  * \param[in] cpg_group_name      CPG group name (for logging)
 618  * \param[in] event_counter       Event number (for logging)
 619  * \param[in] local_nodeid        Node ID of local node
 620  * \param[in] cpg_peer            CPG peer that left
 621  * \param[in] sorted_member_list  List of remaining members, qsort()-ed by ID
 622  * \param[in] member_list_entries Number of entries in \p sorted_member_list
 623  */
 624 static void
 625 node_left(const char *cpg_group_name, int event_counter,
     /* [previous][next][first][last][top][bottom][index][help] */
 626           uint32_t local_nodeid, const struct cpg_address *cpg_peer,
 627           const struct cpg_address **sorted_member_list,
 628           size_t member_list_entries)
 629 {
 630     crm_node_t *peer = pcmk__search_cluster_node_cache(cpg_peer->nodeid,
 631                                                        NULL);
 632     const struct cpg_address **rival = NULL;
 633 
 634     /* Most CPG-related Pacemaker code assumes that only one process on a node
 635      * can be in the process group, but Corosync does not impose this
 636      * limitation, and more than one can be a member in practice due to a
 637      * daemon attempting to start while another instance is already running.
 638      *
 639      * Check for any such duplicate instances, because we don't want to process
 640      * their leaving as if our actual peer left. If the peer that left still has
 641      * an entry in sorted_member_list (with a different PID), we will ignore the
 642      * leaving.
 643      *
 644      * @TODO Track CPG members' PIDs so we can tell exactly who left.
 645      */
 646     if (peer != NULL) {
 647         rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
 648                         sizeof(const struct cpg_address *),
 649                         cmp_member_list_nodeid);
 650     }
 651 
 652     if (rival == NULL) {
 653         crm_info("Group %s event %d: %s (node %u pid %u) left%s",
 654                  cpg_group_name, event_counter, peer_name(peer),
 655                  cpg_peer->nodeid, cpg_peer->pid,
 656                  cpgreason2str(cpg_peer->reason));
 657         if (peer != NULL) {
 658             crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 659                                  OFFLINESTATUS);
 660         }
 661     } else if (cpg_peer->nodeid == local_nodeid) {
 662         crm_warn("Group %s event %d: duplicate local pid %u left%s",
 663                  cpg_group_name, event_counter,
 664                  cpg_peer->pid, cpgreason2str(cpg_peer->reason));
 665     } else {
 666         crm_warn("Group %s event %d: "
 667                  "%s (node %u) duplicate pid %u left%s (%u remains)",
 668                  cpg_group_name, event_counter, peer_name(peer),
 669                  cpg_peer->nodeid, cpg_peer->pid,
 670                  cpgreason2str(cpg_peer->reason), (*rival)->pid);
 671     }
 672 }
 673 
 674 /*!
 675  * \brief Handle a CPG configuration change event
 676  *
 677  * \param[in] handle               CPG connection
 678  * \param[in] cpg_name             CPG group name
 679  * \param[in] member_list          List of current CPG members
 680  * \param[in] member_list_entries  Number of entries in \p member_list
 681  * \param[in] left_list            List of CPG members that left
 682  * \param[in] left_list_entries    Number of entries in \p left_list
 683  * \param[in] joined_list          List of CPG members that joined
 684  * \param[in] joined_list_entries  Number of entries in \p joined_list
 685  */
 686 void
 687 pcmk_cpg_membership(cpg_handle_t handle,
     /* [previous][next][first][last][top][bottom][index][help] */
 688                     const struct cpg_name *groupName,
 689                     const struct cpg_address *member_list, size_t member_list_entries,
 690                     const struct cpg_address *left_list, size_t left_list_entries,
 691                     const struct cpg_address *joined_list, size_t joined_list_entries)
 692 {
 693     int i;
 694     gboolean found = FALSE;
 695     static int counter = 0;
 696     uint32_t local_nodeid = get_local_nodeid(handle);
 697     const struct cpg_address **sorted;
 698 
 699     sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
 700     CRM_ASSERT(sorted != NULL);
 701 
 702     for (size_t iter = 0; iter < member_list_entries; iter++) {
 703         sorted[iter] = member_list + iter;
 704     }
 705     /* so that the cross-matching multiply-subscribed nodes is then cheap */
 706     qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
 707           cmp_member_list_nodeid);
 708 
 709     for (i = 0; i < left_list_entries; i++) {
 710         node_left(groupName->value, counter, local_nodeid, &left_list[i],
 711                   sorted, member_list_entries);
 712     }
 713     free(sorted);
 714     sorted = NULL;
 715 
 716     for (i = 0; i < joined_list_entries; i++) {
 717         crm_info("Group %s event %d: node %u pid %u joined%s",
 718                  groupName->value, counter, joined_list[i].nodeid,
 719                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
 720     }
 721 
 722     for (i = 0; i < member_list_entries; i++) {
 723         crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
 724 
 725         if (member_list[i].nodeid == local_nodeid
 726                 && member_list[i].pid != getpid()) {
 727             // See the note in node_left()
 728             crm_warn("Group %s event %d: detected duplicate local pid %u",
 729                      groupName->value, counter, member_list[i].pid);
 730             continue;
 731         }
 732         crm_info("Group %s event %d: %s (node %u pid %u) is member",
 733                  groupName->value, counter, peer_name(peer),
 734                  member_list[i].nodeid, member_list[i].pid);
 735 
 736         /* If the caller left auto-reaping enabled, this will also update the
 737          * state to member.
 738          */
 739         peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 740                                     ONLINESTATUS);
 741 
 742         if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
 743             /* The node is a CPG member, but we currently think it's not a
 744              * cluster member. This is possible only if auto-reaping was
 745              * disabled. The node may be joining, and we happened to get the CPG
 746              * notification before the quorum notification; or the node may have
 747              * just died, and we are processing its final messages; or a bug
 748              * has affected the peer cache.
 749              */
 750             time_t now = time(NULL);
 751 
 752             if (peer->when_lost == 0) {
 753                 // Track when we first got into this contradictory state
 754                 peer->when_lost = now;
 755 
 756             } else if (now > (peer->when_lost + 60)) {
 757                 // If it persists for more than a minute, update the state
 758                 crm_warn("Node %u is member of group %s but was believed offline",
 759                          member_list[i].nodeid, groupName->value);
 760                 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
 761             }
 762         }
 763 
 764         if (local_nodeid == member_list[i].nodeid) {
 765             found = TRUE;
 766         }
 767     }
 768 
 769     if (!found) {
 770         crm_err("Local node was evicted from group %s", groupName->value);
 771         cpg_evicted = true;
 772     }
 773 
 774     counter++;
 775 }
 776 
 777 /*!
 778  * \brief Connect to Corosync CPG
 779  *
 780  * \param[in] cluster  Cluster object
 781  *
 782  * \return TRUE on success, otherwise FALSE
 783  */
 784 gboolean
 785 cluster_connect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
 786 {
 787     cs_error_t rc;
 788     int fd = -1;
 789     int retries = 0;
 790     uint32_t id = 0;
 791     crm_node_t *peer = NULL;
 792     cpg_handle_t handle = 0;
 793     const char *message_name = pcmk__message_name(crm_system_name);
 794     uid_t found_uid = 0;
 795     gid_t found_gid = 0;
 796     pid_t found_pid = 0;
 797     int rv;
 798 
 799     struct mainloop_fd_callbacks cpg_fd_callbacks = {
 800         .dispatch = pcmk_cpg_dispatch,
 801         .destroy = cluster->destroy,
 802     };
 803 
 804     cpg_model_v1_data_t cpg_model_info = {
 805             .model = CPG_MODEL_V1,
 806             .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
 807             .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
 808             .cpg_totem_confchg_fn = NULL,
 809             .flags = 0,
 810     };
 811 
 812     cpg_evicted = false;
 813     cluster->group.length = 0;
 814     cluster->group.value[0] = 0;
 815 
 816     /* group.value is char[128] */
 817     strncpy(cluster->group.value, message_name, 127);
 818     cluster->group.value[127] = 0;
 819     cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
 820 
 821     cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
 822     if (rc != CS_OK) {
 823         crm_err("Could not connect to the CPG API: %s (%d)",
 824                 cs_strerror(rc), rc);
 825         goto bail;
 826     }
 827 
 828     rc = cpg_fd_get(handle, &fd);
 829     if (rc != CS_OK) {
 830         crm_err("Could not obtain the CPG API connection: %s (%d)",
 831                 cs_strerror(rc), rc);
 832         goto bail;
 833     }
 834 
 835     /* CPG provider run as root (in given user namespace, anyway)? */
 836     if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 837                                             &found_uid, &found_gid))) {
 838         crm_err("CPG provider is not authentic:"
 839                 " process %lld (uid: %lld, gid: %lld)",
 840                 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 841                 (long long) found_uid, (long long) found_gid);
 842         rc = CS_ERR_ACCESS;
 843         goto bail;
 844     } else if (rv < 0) {
 845         crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 846                 strerror(-rv), -rv);
 847         rc = CS_ERR_ACCESS;
 848         goto bail;
 849     }
 850 
 851     id = get_local_nodeid(handle);
 852     if (id == 0) {
 853         crm_err("Could not get local node id from the CPG API");
 854         goto bail;
 855 
 856     }
 857     cluster->nodeid = id;
 858 
 859     retries = 0;
 860     cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
 861     if (rc != CS_OK) {
 862         crm_err("Could not join the CPG group '%s': %d", message_name, rc);
 863         goto bail;
 864     }
 865 
 866     pcmk_cpg_handle = handle;
 867     cluster->cpg_handle = handle;
 868     mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
 869 
 870   bail:
 871     if (rc != CS_OK) {
 872         cpg_finalize(handle);
 873         return FALSE;
 874     }
 875 
 876     peer = crm_get_peer(id, NULL);
 877     crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
 878     return TRUE;
 879 }
 880 
 881 /*!
 882  * \internal
 883  * \brief Send an XML message via Corosync CPG
 884  *
 885  * \param[in] msg   XML message to send
 886  * \param[in] node  Cluster node to send message to
 887  * \param[in] dest  Type of message to send
 888  *
 889  * \return TRUE on success, otherwise FALSE
 890  */
 891 gboolean
 892 pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
     /* [previous][next][first][last][top][bottom][index][help] */
 893 {
 894     gboolean rc = TRUE;
 895     char *data = NULL;
 896 
 897     data = dump_xml_unformatted(msg);
 898     rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
 899     free(data);
 900     return rc;
 901 }
 902 
 903 /*!
 904  * \internal
 905  * \brief Send string data via Corosync CPG
 906  *
 907  * \param[in] msg_class  Message class (to set as CPG header ID)
 908  * \param[in] data       Data to send
 909  * \param[in] local      What to set as host "local" value (which is never used)
 910  * \param[in] node       Cluster node to send message to
 911  * \param[in] dest       Type of message to send
 912  *
 913  * \return TRUE on success, otherwise FALSE
 914  */
 915 gboolean
 916 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
     /* [previous][next][first][last][top][bottom][index][help] */
 917                   gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
 918 {
 919     static int msg_id = 0;
 920     static int local_pid = 0;
 921     static int local_name_len = 0;
 922     static const char *local_name = NULL;
 923 
 924     char *target = NULL;
 925     struct iovec *iov;
 926     pcmk__cpg_msg_t *msg = NULL;
 927     enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
 928 
 929     switch (msg_class) {
 930         case crm_class_cluster:
 931             break;
 932         default:
 933             crm_err("Invalid message class: %d", msg_class);
 934             return FALSE;
 935     }
 936 
 937     CRM_CHECK(dest != crm_msg_ais, return FALSE);
 938 
 939     if (local_name == NULL) {
 940         local_name = get_local_node_name();
 941     }
 942     if ((local_name_len == 0) && (local_name != NULL)) {
 943         local_name_len = strlen(local_name);
 944     }
 945 
 946     if (data == NULL) {
 947         data = "";
 948     }
 949 
 950     if (local_pid == 0) {
 951         local_pid = getpid();
 952     }
 953 
 954     if (sender == crm_msg_none) {
 955         sender = local_pid;
 956     }
 957 
 958     msg = calloc(1, sizeof(pcmk__cpg_msg_t));
 959 
 960     msg_id++;
 961     msg->id = msg_id;
 962     msg->header.id = msg_class;
 963     msg->header.error = CS_OK;
 964 
 965     msg->host.type = dest;
 966     msg->host.local = local;
 967 
 968     if (node) {
 969         if (node->uname) {
 970             target = strdup(node->uname);
 971             msg->host.size = strlen(node->uname);
 972             memset(msg->host.uname, 0, MAX_NAME);
 973             memcpy(msg->host.uname, node->uname, msg->host.size);
 974         } else {
 975             target = crm_strdup_printf("%u", node->id);
 976         }
 977         msg->host.id = node->id;
 978     } else {
 979         target = strdup("all");
 980     }
 981 
 982     msg->sender.id = 0;
 983     msg->sender.type = sender;
 984     msg->sender.pid = local_pid;
 985     msg->sender.size = local_name_len;
 986     memset(msg->sender.uname, 0, MAX_NAME);
 987     if ((local_name != NULL) && (msg->sender.size != 0)) {
 988         memcpy(msg->sender.uname, local_name, msg->sender.size);
 989     }
 990 
 991     msg->size = 1 + strlen(data);
 992     msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
 993 
 994     if (msg->size < CRM_BZ2_THRESHOLD) {
 995         msg = pcmk__realloc(msg, msg->header.size);
 996         memcpy(msg->data, data, msg->size);
 997 
 998     } else {
 999         char *compressed = NULL;
1000         unsigned int new_size = 0;
1001         char *uncompressed = strdup(data);
1002 
1003         if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
1004                            &compressed, &new_size) == pcmk_rc_ok) {
1005 
1006             msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
1007             msg = pcmk__realloc(msg, msg->header.size);
1008             memcpy(msg->data, compressed, new_size);
1009 
1010             msg->is_compressed = TRUE;
1011             msg->compressed_size = new_size;
1012 
1013         } else {
1014             // cppcheck seems not to understand the abort logic in pcmk__realloc
1015             // cppcheck-suppress memleak
1016             msg = pcmk__realloc(msg, msg->header.size);
1017             memcpy(msg->data, data, msg->size);
1018         }
1019 
1020         free(uncompressed);
1021         free(compressed);
1022     }
1023 
1024     iov = calloc(1, sizeof(struct iovec));
1025     iov->iov_base = msg;
1026     iov->iov_len = msg->header.size;
1027 
1028     if (msg->compressed_size) {
1029         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1030                   msg->id, target, (unsigned long long) iov->iov_len,
1031                   msg->compressed_size, data);
1032     } else {
1033         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1034                   msg->id, target, (unsigned long long) iov->iov_len,
1035                   msg->size, data);
1036     }
1037     free(target);
1038 
1039     cs_message_queue = g_list_append(cs_message_queue, iov);
1040     crm_cs_flush(&pcmk_cpg_handle);
1041 
1042     return TRUE;
1043 }
1044 
1045 /*!
1046  * \brief Get the message type equivalent of a string
1047  *
1048  * \param[in] text  String of message type
1049  *
1050  * \return Message type equivalent of \p text
1051  */
1052 enum crm_ais_msg_types
1053 text2msg_type(const char *text)
     /* [previous][next][first][last][top][bottom][index][help] */
1054 {
1055     int type = crm_msg_none;
1056 
1057     CRM_CHECK(text != NULL, return type);
1058     text = pcmk__message_name(text);
1059     if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1060         type = crm_msg_ais;
1061     } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1062         type = crm_msg_cib;
1063     } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1064         type = crm_msg_crmd;
1065     } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1066         type = crm_msg_te;
1067     } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1068         type = crm_msg_pe;
1069     } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1070         type = crm_msg_lrmd;
1071     } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1072         type = crm_msg_stonithd;
1073     } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1074         type = crm_msg_stonith_ng;
1075     } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1076         type = crm_msg_attrd;
1077 
1078     } else {
1079         /* This will normally be a transient client rather than
1080          * a cluster daemon.  Set the type to the pid of the client
1081          */
1082         int scan_rc = sscanf(text, "%d", &type);
1083 
1084         if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1085             /* Ensure it's sane */
1086             type = crm_msg_none;
1087         }
1088     }
1089     return type;
1090 }

/* [previous][next][first][last][top][bottom][index][help] */