root/lib/cluster/cpg.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. cluster_disconnect_cpg
  2. get_local_nodeid
  3. crm_cs_flush_cb
  4. crm_cs_flush
  5. pcmk_cpg_dispatch
  6. ais_dest
  7. msg_type2text
  8. check_message_sanity
  9. pcmk_message_common_cs
  10. cmp_member_list_nodeid
  11. cpgreason2str
  12. peer_name
  13. pcmk_cpg_membership
  14. cluster_connect_cpg
  15. pcmk__cpg_send_xml
  16. send_cluster_text
  17. text2msg_type

   1 /*
   2  * Copyright 2004-2020 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 #include <bzlib.h>
  12 #include <sys/socket.h>
  13 #include <netinet/in.h>
  14 #include <arpa/inet.h>
  15 #include <netdb.h>
  16 
  17 #include <crm/common/ipc.h>
  18 #include <crm/cluster/internal.h>
  19 #include <crm/common/mainloop.h>
  20 #include <sys/utsname.h>
  21 
  22 #include <qb/qbipc_common.h>
  23 #include <qb/qbipcc.h>
  24 #include <qb/qbutil.h>
  25 
  26 #include <corosync/corodefs.h>
  27 #include <corosync/corotypes.h>
  28 #include <corosync/hdb.h>
  29 #include <corosync/cpg.h>
  30 
  31 #include <crm/msg_xml.h>
  32 
  33 #include <crm/common/ipc_internal.h>  /* PCMK__SPECIAL_PID* */
  34 #include "crmcluster_private.h"
  35 
  36 /* @TODO Once we can update the public API to require crm_cluster_t* in more
  37  *       functions, we can ditch this in favor of cluster->cpg_handle.
  38  */
  39 static cpg_handle_t pcmk_cpg_handle = 0;
  40 
  41 // @TODO These could be moved to crm_cluster_t* at that time as well
  42 static bool cpg_evicted = false;
  43 static GList *cs_message_queue = NULL;
  44 static int cs_message_timer = 0;
  45 
  46 struct pcmk__cpg_host_s {
  47     uint32_t id;
  48     uint32_t pid;
  49     gboolean local;
  50     enum crm_ais_msg_types type;
  51     uint32_t size;
  52     char uname[MAX_NAME];
  53 } __attribute__ ((packed));
  54 
  55 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
  56 
  57 struct pcmk__cpg_msg_s {
  58     struct qb_ipc_response_header header __attribute__ ((aligned(8)));
  59     uint32_t id;
  60     gboolean is_compressed;
  61 
  62     pcmk__cpg_host_t host;
  63     pcmk__cpg_host_t sender;
  64 
  65     uint32_t size;
  66     uint32_t compressed_size;
  67     /* 584 bytes */
  68     char data[0];
  69 
  70 } __attribute__ ((packed));
  71 
  72 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
  73 
  74 static void crm_cs_flush(gpointer data);
  75 
  76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
  77 
  78 #define cs_repeat(rc, counter, max, code) do {                          \
  79         rc = code;                                                      \
  80         if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) {    \
  81             counter++;                                                  \
  82             crm_debug("Retrying operation after %ds", counter);         \
  83             sleep(counter);                                             \
  84         } else {                                                        \
  85             break;                                                      \
  86         }                                                               \
  87     } while (counter < max)
  88 
  89 /*!
  90  * \brief Disconnect from Corosync CPG
  91  *
  92  * \param[in] Cluster to disconnect
  93  */
  94 void
  95 cluster_disconnect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
  96 {
  97     pcmk_cpg_handle = 0;
  98     if (cluster->cpg_handle) {
  99         crm_trace("Disconnecting CPG");
 100         cpg_leave(cluster->cpg_handle, &cluster->group);
 101         cpg_finalize(cluster->cpg_handle);
 102         cluster->cpg_handle = 0;
 103 
 104     } else {
 105         crm_info("No CPG connection");
 106     }
 107 }
 108 
 109 /*!
 110  * \brief Get the local Corosync node ID (via CPG)
 111  *
 112  * \param[in] handle  CPG connection to use (or 0 to use new connection)
 113  *
 114  * \return Corosync ID of local node (or 0 if not known)
 115  */
 116 uint32_t
 117 get_local_nodeid(cpg_handle_t handle)
     /* [previous][next][first][last][top][bottom][index][help] */
 118 {
 119     cs_error_t rc = CS_OK;
 120     int retries = 0;
 121     static uint32_t local_nodeid = 0;
 122     cpg_handle_t local_handle = handle;
 123     cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
 124     int fd = -1;
 125     uid_t found_uid = 0;
 126     gid_t found_gid = 0;
 127     pid_t found_pid = 0;
 128     int rv;
 129 
 130     if(local_nodeid != 0) {
 131         return local_nodeid;
 132     }
 133 
 134     if(handle == 0) {
 135         crm_trace("Creating connection");
 136         cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
 137         if (rc != CS_OK) {
 138             crm_err("Could not connect to the CPG API: %s (%d)",
 139                     cs_strerror(rc), rc);
 140             return 0;
 141         }
 142 
 143         rc = cpg_fd_get(local_handle, &fd);
 144         if (rc != CS_OK) {
 145             crm_err("Could not obtain the CPG API connection: %s (%d)",
 146                     cs_strerror(rc), rc);
 147             goto bail;
 148         }
 149 
 150         /* CPG provider run as root (in given user namespace, anyway)? */
 151         if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 152                                                 &found_uid, &found_gid))) {
 153             crm_err("CPG provider is not authentic:"
 154                     " process %lld (uid: %lld, gid: %lld)",
 155                     (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 156                     (long long) found_uid, (long long) found_gid);
 157             goto bail;
 158         } else if (rv < 0) {
 159             crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 160                     strerror(-rv), -rv);
 161             goto bail;
 162         }
 163     }
 164 
 165     if (rc == CS_OK) {
 166         retries = 0;
 167         crm_trace("Performing lookup");
 168         cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
 169     }
 170 
 171     if (rc != CS_OK) {
 172         crm_err("Could not get local node id from the CPG API: %s (%d)",
 173                 pcmk__cs_err_str(rc), rc);
 174     }
 175 
 176 bail:
 177     if(handle == 0) {
 178         crm_trace("Closing connection");
 179         cpg_finalize(local_handle);
 180     }
 181     crm_debug("Local nodeid is %u", local_nodeid);
 182     return local_nodeid;
 183 }
 184 
 185 /*!
 186  * \internal
 187  * \brief Callback function for Corosync message queue timer
 188  *
 189  * \param[in] data  CPG handle
 190  *
 191  * \return FALSE (to indicate to glib that timer should not be removed)
 192  */
 193 static gboolean
 194 crm_cs_flush_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 195 {
 196     cs_message_timer = 0;
 197     crm_cs_flush(data);
 198     return FALSE;
 199 }
 200 
 201 // Send no more than this many CPG messages in one flush
 202 #define CS_SEND_MAX 200
 203 
 204 /*!
 205  * \internal
 206  * \brief Send messages in Corosync CPG message queue
 207  *
 208  * \param[in] data   CPG handle
 209  */
 210 static void
 211 crm_cs_flush(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 212 {
 213     unsigned int sent = 0;
 214     guint queue_len = 0;
 215     cs_error_t rc = 0;
 216     cpg_handle_t *handle = (cpg_handle_t *) data;
 217 
 218     if (*handle == 0) {
 219         crm_trace("Connection is dead");
 220         return;
 221     }
 222 
 223     queue_len = g_list_length(cs_message_queue);
 224     if (((queue_len % 1000) == 0) && (queue_len > 1)) {
 225         crm_err("CPG queue has grown to %d", queue_len);
 226 
 227     } else if (queue_len == CS_SEND_MAX) {
 228         crm_warn("CPG queue has grown to %d", queue_len);
 229     }
 230 
 231     if (cs_message_timer != 0) {
 232         /* There is already a timer, wait until it goes off */
 233         crm_trace("Timer active %d", cs_message_timer);
 234         return;
 235     }
 236 
 237     while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
 238         struct iovec *iov = cs_message_queue->data;
 239 
 240         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
 241         if (rc != CS_OK) {
 242             break;
 243         }
 244 
 245         sent++;
 246         crm_trace("CPG message sent, size=%llu",
 247                   (unsigned long long) iov->iov_len);
 248 
 249         cs_message_queue = g_list_remove(cs_message_queue, iov);
 250         free(iov->iov_base);
 251         free(iov);
 252     }
 253 
 254     queue_len -= sent;
 255     if ((sent > 1) || (cs_message_queue != NULL)) {
 256         crm_info("Sent %u CPG messages  (%d remaining): %s (%d)",
 257                  sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
 258     } else {
 259         crm_trace("Sent %u CPG messages  (%d remaining): %s (%d)",
 260                   sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
 261     }
 262 
 263     if (cs_message_queue) {
 264         uint32_t delay_ms = 100;
 265         if (rc != CS_OK) {
 266             /* Proportionally more if sending failed but cap at 1s */
 267             delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
 268         }
 269         cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
 270     }
 271 }
 272 
 273 /*!
 274  * \internal
 275  * \brief Dispatch function for CPG handle
 276  *
 277  * \param[in] user_data  Cluster object
 278  *
 279  * \return 0 on success, -1 on error (per mainloop_io_t interface)
 280  */
 281 static int
 282 pcmk_cpg_dispatch(gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
 283 {
 284     cs_error_t rc = CS_OK;
 285     crm_cluster_t *cluster = (crm_cluster_t *) user_data;
 286 
 287     rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
 288     if (rc != CS_OK) {
 289         crm_err("Connection to the CPG API failed: %s (%d)",
 290                 pcmk__cs_err_str(rc), rc);
 291         cpg_finalize(cluster->cpg_handle);
 292         cluster->cpg_handle = 0;
 293         return -1;
 294 
 295     } else if (cpg_evicted) {
 296         crm_err("Evicted from CPG membership");
 297         return -1;
 298     }
 299     return 0;
 300 }
 301 
 302 static inline const char *
 303 ais_dest(const pcmk__cpg_host_t *host)
     /* [previous][next][first][last][top][bottom][index][help] */
 304 {
 305     if (host->local) {
 306         return "local";
 307     } else if (host->size > 0) {
 308         return host->uname;
 309     } else {
 310         return "<all>";
 311     }
 312 }
 313 
 314 static inline const char *
 315 msg_type2text(enum crm_ais_msg_types type)
     /* [previous][next][first][last][top][bottom][index][help] */
 316 {
 317     const char *text = "unknown";
 318 
 319     switch (type) {
 320         case crm_msg_none:
 321             text = "unknown";
 322             break;
 323         case crm_msg_ais:
 324             text = "ais";
 325             break;
 326         case crm_msg_cib:
 327             text = "cib";
 328             break;
 329         case crm_msg_crmd:
 330             text = "crmd";
 331             break;
 332         case crm_msg_pe:
 333             text = "pengine";
 334             break;
 335         case crm_msg_te:
 336             text = "tengine";
 337             break;
 338         case crm_msg_lrmd:
 339             text = "lrmd";
 340             break;
 341         case crm_msg_attrd:
 342             text = "attrd";
 343             break;
 344         case crm_msg_stonithd:
 345             text = "stonithd";
 346             break;
 347         case crm_msg_stonith_ng:
 348             text = "stonith-ng";
 349             break;
 350     }
 351     return text;
 352 }
 353 
 354 /*!
 355  * \internal
 356  * \brief Check whether a Corosync CPG message is valid
 357  *
 358  * \param[in] msg   Corosync CPG message to check
 359  *
 360  * \return true if \p msg is valid, otherwise false
 361  */
 362 static bool
 363 check_message_sanity(const pcmk__cpg_msg_t *msg)
     /* [previous][next][first][last][top][bottom][index][help] */
 364 {
 365     int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
 366 
 367     if (payload_size < 1) {
 368         crm_err("%sCPG message %d from %s invalid: "
 369                 "Claimed size of %d bytes is too small "
 370                 CRM_XS " from %s[%u] to %s@%s",
 371                 (msg->is_compressed? "Compressed " : ""),
 372                 msg->id, ais_dest(&(msg->sender)),
 373                 (int) msg->header.size,
 374                 msg_type2text(msg->sender.type), msg->sender.pid,
 375                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 376         return false;
 377     }
 378 
 379     if (msg->header.error != CS_OK) {
 380         crm_err("%sCPG message %d from %s invalid: "
 381                 "Sender indicated error %d "
 382                 CRM_XS " from %s[%u] to %s@%s",
 383                 (msg->is_compressed? "Compressed " : ""),
 384                 msg->id, ais_dest(&(msg->sender)),
 385                 msg->header.error,
 386                 msg_type2text(msg->sender.type), msg->sender.pid,
 387                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 388         return false;
 389     }
 390 
 391     if (msg_data_len(msg) != payload_size) {
 392         crm_err("%sCPG message %d from %s invalid: "
 393                 "Total size %d inconsistent with payload size %d "
 394                 CRM_XS " from %s[%u] to %s@%s",
 395                 (msg->is_compressed? "Compressed " : ""),
 396                 msg->id, ais_dest(&(msg->sender)),
 397                 (int) msg->header.size, (int) msg_data_len(msg),
 398                 msg_type2text(msg->sender.type), msg->sender.pid,
 399                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 400         return false;
 401     }
 402 
 403     if (!msg->is_compressed &&
 404         /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
 405          * but checking the last byte or two should be quick
 406          */
 407         (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
 408          || (msg->data[msg->size - 1] != '\0'))) {
 409         crm_err("CPG message %d from %s invalid: "
 410                 "Payload does not end at byte %llu "
 411                 CRM_XS " from %s[%u] to %s@%s",
 412                 msg->id, ais_dest(&(msg->sender)),
 413                 (unsigned long long) msg->size,
 414                 msg_type2text(msg->sender.type), msg->sender.pid,
 415                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 416         return false;
 417     }
 418 
 419     crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
 420               (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
 421               msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
 422               ais_dest(&(msg->sender)),
 423               msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 424     return true;
 425 }
 426 
 427 /*!
 428  * \brief Extract text data from a Corosync CPG message
 429  *
 430  * \param[in]  handle   CPG connection (to get local node ID if not yet known)
 431  * \param[in]  nodeid   Corosync ID of node that sent message
 432  * \param[in]  pid      Process ID of message sender (for logging only)
 433  * \param[in]  content  CPG message
 434  * \param[out] kind     If not NULL, will be set to CPG header ID
 435  *                      (which should be an enum crm_ais_msg_class value,
 436  *                      currently always crm_class_cluster)
 437  * \param[out] from     If not NULL, will be set to sender uname
 438  *                      (valid for the lifetime of \p content)
 439  *
 440  * \return Newly allocated string with message data
 441  * \note It is the caller's responsibility to free the return value with free().
 442  */
 443 char *
 444 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
     /* [previous][next][first][last][top][bottom][index][help] */
 445                         uint32_t *kind, const char **from)
 446 {
 447     char *data = NULL;
 448     pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
 449 
 450     if(handle) {
 451         // Do filtering and field massaging
 452         uint32_t local_nodeid = get_local_nodeid(handle);
 453         const char *local_name = get_local_node_name();
 454 
 455         if (msg->sender.id > 0 && msg->sender.id != nodeid) {
 456             crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
 457             return NULL;
 458 
 459         } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
 460             /* Not for us */
 461             crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
 462             return NULL;
 463         } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
 464             /* Not for us */
 465             crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
 466             return NULL;
 467         }
 468 
 469         msg->sender.id = nodeid;
 470         if (msg->sender.size == 0) {
 471             crm_node_t *peer = crm_get_peer(nodeid, NULL);
 472 
 473             if (peer == NULL) {
 474                 crm_err("Peer with nodeid=%u is unknown", nodeid);
 475 
 476             } else if (peer->uname == NULL) {
 477                 crm_err("No uname for peer with nodeid=%u", nodeid);
 478 
 479             } else {
 480                 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
 481                 msg->sender.size = strlen(peer->uname);
 482                 memset(msg->sender.uname, 0, MAX_NAME);
 483                 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
 484             }
 485         }
 486     }
 487 
 488     crm_trace("Got new%s message (size=%d, %d, %d)",
 489               msg->is_compressed ? " compressed" : "",
 490               msg_data_len(msg), msg->size, msg->compressed_size);
 491 
 492     if (kind != NULL) {
 493         *kind = msg->header.id;
 494     }
 495     if (from != NULL) {
 496         *from = msg->sender.uname;
 497     }
 498 
 499     if (msg->is_compressed && msg->size > 0) {
 500         int rc = BZ_OK;
 501         char *uncompressed = NULL;
 502         unsigned int new_size = msg->size + 1;
 503 
 504         if (!check_message_sanity(msg)) {
 505             goto badmsg;
 506         }
 507 
 508         crm_trace("Decompressing message data");
 509         uncompressed = calloc(1, new_size);
 510         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
 511 
 512         if (rc != BZ_OK) {
 513             crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
 514                     bz2_strerror(rc), rc);
 515             free(uncompressed);
 516             goto badmsg;
 517         }
 518 
 519         CRM_ASSERT(rc == BZ_OK);
 520         CRM_ASSERT(new_size == msg->size);
 521 
 522         data = uncompressed;
 523 
 524     } else if (!check_message_sanity(msg)) {
 525         goto badmsg;
 526 
 527     } else {
 528         data = strdup(msg->data);
 529     }
 530 
 531     // Is this necessary?
 532     crm_get_peer(msg->sender.id, msg->sender.uname);
 533 
 534     crm_trace("Payload: %.200s", data);
 535     return data;
 536 
 537   badmsg:
 538     crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
 539             " min=%d, total=%d, size=%d, bz2_size=%d",
 540             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
 541             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
 542             msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
 543             msg->header.size, msg->size, msg->compressed_size);
 544 
 545     free(data);
 546     return NULL;
 547 }
 548 
 549 /*!
 550  * \internal
 551  * \brief Compare cpg_address objects by node ID
 552  *
 553  * \param[in] first   First cpg_address structure to compare
 554  * \param[in] second  Second cpg_address structure to compare
 555  *
 556  * \return Negative number if first's node ID is lower,
 557  *         positive number if first's node ID is greater,
 558  *         or 0 if both node IDs are equal
 559  */
 560 static int
 561 cmp_member_list_nodeid(const void *first, const void *second)
     /* [previous][next][first][last][top][bottom][index][help] */
 562 {
 563     const struct cpg_address *const a = *((const struct cpg_address **) first),
 564                              *const b = *((const struct cpg_address **) second);
 565     if (a->nodeid < b->nodeid) {
 566         return -1;
 567     } else if (a->nodeid > b->nodeid) {
 568         return 1;
 569     }
 570     /* don't bother with "reason" nor "pid" */
 571     return 0;
 572 }
 573 
 574 /*!
 575  * \internal
 576  * \brief Get a readable string equivalent of a cpg_reason_t value
 577  *
 578  * \param[in] reason  CPG reason value
 579  *
 580  * \return Readable string suitable for logging
 581  */
 582 static const char *
 583 cpgreason2str(cpg_reason_t reason)
     /* [previous][next][first][last][top][bottom][index][help] */
 584 {
 585     switch (reason) {
 586         case CPG_REASON_JOIN:       return " via cpg_join";
 587         case CPG_REASON_LEAVE:      return " via cpg_leave";
 588         case CPG_REASON_NODEDOWN:   return " via cluster exit";
 589         case CPG_REASON_NODEUP:     return " via cluster join";
 590         case CPG_REASON_PROCDOWN:   return " for unknown reason";
 591         default:                    break;
 592     }
 593     return "";
 594 }
 595 
 596 /*!
 597  * \internal
 598  * \brief Get a log-friendly node name
 599  *
 600  * \param[in] peer  Node to check
 601  *
 602  * \return Node's uname, or readable string if not known
 603  */
 604 static inline const char *
 605 peer_name(crm_node_t *peer)
     /* [previous][next][first][last][top][bottom][index][help] */
 606 {
 607     if (peer == NULL) {
 608         return "unknown node";
 609     } else if (peer->uname == NULL) {
 610         return "peer node";
 611     } else {
 612         return peer->uname;
 613     }
 614 }
 615 
 616 /*!
 617  * \brief Handle a CPG configuration change event
 618  *
 619  * \param[in] handle               CPG connection
 620  * \param[in] cpg_name             CPG group name
 621  * \param[in] member_list          List of current CPG members
 622  * \param[in] member_list_entries  Number of entries in \p member_list
 623  * \param[in] left_list            List of CPG members that left
 624  * \param[in] left_list_entries    Number of entries in \p left_list
 625  * \param[in] joined_list          List of CPG members that joined
 626  * \param[in] joined_list_entries  Number of entries in \p joined_list
 627  */
 628 void
 629 pcmk_cpg_membership(cpg_handle_t handle,
     /* [previous][next][first][last][top][bottom][index][help] */
 630                     const struct cpg_name *groupName,
 631                     const struct cpg_address *member_list, size_t member_list_entries,
 632                     const struct cpg_address *left_list, size_t left_list_entries,
 633                     const struct cpg_address *joined_list, size_t joined_list_entries)
 634 {
 635     int i;
 636     gboolean found = FALSE;
 637     static int counter = 0;
 638     uint32_t local_nodeid = get_local_nodeid(handle);
 639     const struct cpg_address *key, **sorted;
 640 
 641     sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
 642     CRM_ASSERT(sorted != NULL);
 643 
 644     for (size_t iter = 0; iter < member_list_entries; iter++) {
 645         sorted[iter] = member_list + iter;
 646     }
 647     /* so that the cross-matching multiply-subscribed nodes is then cheap */
 648     qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
 649           cmp_member_list_nodeid);
 650 
 651     for (i = 0; i < left_list_entries; i++) {
 652         crm_node_t *peer = pcmk__search_cluster_node_cache(left_list[i].nodeid,
 653                                                            NULL);
 654         const struct cpg_address **rival = NULL;
 655 
 656         /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
 657            and not playing by this rule may go wild in case of multiple
 658            residual instances of the same pacemaker daemon at the same node
 659            -- we must ensure that the possible local rival(s) won't make us
 660            cry out and bail (e.g. when they quit themselves), since all the
 661            surrounding logic denies this simple fact that the full membership
 662            is discriminated also per the PID of the process beside mere node
 663            ID (and implicitly, group ID); practically, this will be sound in
 664            terms of not preventing progress, since all the CPG joiners are
 665            also API end-point carriers, and that's what matters locally
 666            (who's the winner);
 667            remotely, we will just compare leave_list and member_list and if
 668            the left process has its node retained in member_list (under some
 669            other PID, anyway) we will just ignore it as well
 670            XXX: long-term fix is to establish in-out PID-aware tracking? */
 671         if (peer) {
 672             key = &left_list[i];
 673             rival = bsearch(&key, sorted, member_list_entries,
 674                             sizeof(const struct cpg_address *),
 675                             cmp_member_list_nodeid);
 676         }
 677 
 678         if (rival == NULL) {
 679             crm_info("Group %s event %d: %s (node %u pid %u) left%s",
 680                      groupName->value, counter, peer_name(peer),
 681                      left_list[i].nodeid, left_list[i].pid,
 682                      cpgreason2str(left_list[i].reason));
 683             if (peer) {
 684                 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 685                                      OFFLINESTATUS);
 686             }
 687         } else if (left_list[i].nodeid == local_nodeid) {
 688             crm_warn("Group %s event %d: duplicate local pid %u left%s",
 689                      groupName->value, counter,
 690                      left_list[i].pid, cpgreason2str(left_list[i].reason));
 691         } else {
 692             crm_warn("Group %s event %d: "
 693                      "%s (node %u) duplicate pid %u left%s (%u remains)",
 694                      groupName->value, counter, peer_name(peer),
 695                      left_list[i].nodeid, left_list[i].pid,
 696                      cpgreason2str(left_list[i].reason), (*rival)->pid);
 697         }
 698     }
 699     free(sorted);
 700     sorted = NULL;
 701 
 702     for (i = 0; i < joined_list_entries; i++) {
 703         crm_info("Group %s event %d: node %u pid %u joined%s",
 704                  groupName->value, counter, joined_list[i].nodeid,
 705                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
 706     }
 707 
 708     for (i = 0; i < member_list_entries; i++) {
 709         crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
 710 
 711         if (member_list[i].nodeid == local_nodeid
 712                 && member_list[i].pid != getpid()) {
 713             /* see the note above */
 714             crm_warn("Group %s event %d: detected duplicate local pid %u",
 715                      groupName->value, counter, member_list[i].pid);
 716             continue;
 717         }
 718         crm_info("Group %s event %d: %s (node %u pid %u) is member",
 719                  groupName->value, counter, peer_name(peer),
 720                  member_list[i].nodeid, member_list[i].pid);
 721 
 722         /* If the caller left auto-reaping enabled, this will also update the
 723          * state to member.
 724          */
 725         peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 726                                     ONLINESTATUS);
 727 
 728         if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
 729             /* The node is a CPG member, but we currently think it's not a
 730              * cluster member. This is possible only if auto-reaping was
 731              * disabled. The node may be joining, and we happened to get the CPG
 732              * notification before the quorum notification; or the node may have
 733              * just died, and we are processing its final messages; or a bug
 734              * has affected the peer cache.
 735              */
 736             time_t now = time(NULL);
 737 
 738             if (peer->when_lost == 0) {
 739                 // Track when we first got into this contradictory state
 740                 peer->when_lost = now;
 741 
 742             } else if (now > (peer->when_lost + 60)) {
 743                 // If it persists for more than a minute, update the state
 744                 crm_warn("Node %u is member of group %s but was believed offline",
 745                          member_list[i].nodeid, groupName->value);
 746                 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
 747             }
 748         }
 749 
 750         if (local_nodeid == member_list[i].nodeid) {
 751             found = TRUE;
 752         }
 753     }
 754 
 755     if (!found) {
 756         crm_err("Local node was evicted from group %s", groupName->value);
 757         cpg_evicted = true;
 758     }
 759 
 760     counter++;
 761 }
 762 
 763 /*!
 764  * \brief Connect to Corosync CPG
 765  *
 766  * \param[in] cluster  Cluster object
 767  *
 768  * \return TRUE on success, otherwise FALSE
 769  */
 770 gboolean
 771 cluster_connect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
 772 {
 773     cs_error_t rc;
 774     int fd = -1;
 775     int retries = 0;
 776     uint32_t id = 0;
 777     crm_node_t *peer = NULL;
 778     cpg_handle_t handle = 0;
 779     const char *message_name = pcmk__message_name(crm_system_name);
 780     uid_t found_uid = 0;
 781     gid_t found_gid = 0;
 782     pid_t found_pid = 0;
 783     int rv;
 784 
 785     struct mainloop_fd_callbacks cpg_fd_callbacks = {
 786         .dispatch = pcmk_cpg_dispatch,
 787         .destroy = cluster->destroy,
 788     };
 789 
 790     cpg_model_v1_data_t cpg_model_info = {
 791             .model = CPG_MODEL_V1,
 792             .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
 793             .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
 794             .cpg_totem_confchg_fn = NULL,
 795             .flags = 0,
 796     };
 797 
 798     cpg_evicted = false;
 799     cluster->group.length = 0;
 800     cluster->group.value[0] = 0;
 801 
 802     /* group.value is char[128] */
 803     strncpy(cluster->group.value, message_name, 127);
 804     cluster->group.value[127] = 0;
 805     cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
 806 
 807     cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
 808     if (rc != CS_OK) {
 809         crm_err("Could not connect to the CPG API: %s (%d)",
 810                 cs_strerror(rc), rc);
 811         goto bail;
 812     }
 813 
 814     rc = cpg_fd_get(handle, &fd);
 815     if (rc != CS_OK) {
 816         crm_err("Could not obtain the CPG API connection: %s (%d)",
 817                 cs_strerror(rc), rc);
 818         goto bail;
 819     }
 820 
 821     /* CPG provider run as root (in given user namespace, anyway)? */
 822     if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 823                                             &found_uid, &found_gid))) {
 824         crm_err("CPG provider is not authentic:"
 825                 " process %lld (uid: %lld, gid: %lld)",
 826                 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 827                 (long long) found_uid, (long long) found_gid);
 828         rc = CS_ERR_ACCESS;
 829         goto bail;
 830     } else if (rv < 0) {
 831         crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 832                 strerror(-rv), -rv);
 833         rc = CS_ERR_ACCESS;
 834         goto bail;
 835     }
 836 
 837     id = get_local_nodeid(handle);
 838     if (id == 0) {
 839         crm_err("Could not get local node id from the CPG API");
 840         goto bail;
 841 
 842     }
 843     cluster->nodeid = id;
 844 
 845     retries = 0;
 846     cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
 847     if (rc != CS_OK) {
 848         crm_err("Could not join the CPG group '%s': %d", message_name, rc);
 849         goto bail;
 850     }
 851 
 852     pcmk_cpg_handle = handle;
 853     cluster->cpg_handle = handle;
 854     mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
 855 
 856   bail:
 857     if (rc != CS_OK) {
 858         cpg_finalize(handle);
 859         return FALSE;
 860     }
 861 
 862     peer = crm_get_peer(id, NULL);
 863     crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
 864     return TRUE;
 865 }
 866 
 867 /*!
 868  * \internal
 869  * \brief Send an XML message via Corosync CPG
 870  *
 871  * \param[in] msg   XML message to send
 872  * \param[in] node  Cluster node to send message to
 873  * \param[in] dest  Type of message to send
 874  *
 875  * \return TRUE on success, otherwise FALSE
 876  */
 877 gboolean
 878 pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
     /* [previous][next][first][last][top][bottom][index][help] */
 879 {
 880     gboolean rc = TRUE;
 881     char *data = NULL;
 882 
 883     data = dump_xml_unformatted(msg);
 884     rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
 885     free(data);
 886     return rc;
 887 }
 888 
 889 /*!
 890  * \internal
 891  * \brief Send string data via Corosync CPG
 892  *
 893  * \param[in] msg_class  Message class (to set as CPG header ID)
 894  * \param[in] data       Data to send
 895  * \param[in] local      What to set as host "local" value (which is never used)
 896  * \param[in] node       Cluster node to send message to
 897  * \param[in] dest       Type of message to send
 898  *
 899  * \return TRUE on success, otherwise FALSE
 900  */
 901 gboolean
 902 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
     /* [previous][next][first][last][top][bottom][index][help] */
 903                   gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
 904 {
 905     static int msg_id = 0;
 906     static int local_pid = 0;
 907     static int local_name_len = 0;
 908     static const char *local_name = NULL;
 909 
 910     char *target = NULL;
 911     struct iovec *iov;
 912     pcmk__cpg_msg_t *msg = NULL;
 913     enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
 914 
 915     switch (msg_class) {
 916         case crm_class_cluster:
 917             break;
 918         default:
 919             crm_err("Invalid message class: %d", msg_class);
 920             return FALSE;
 921     }
 922 
 923     CRM_CHECK(dest != crm_msg_ais, return FALSE);
 924 
 925     if (local_name == NULL) {
 926         local_name = get_local_node_name();
 927     }
 928     if ((local_name_len == 0) && (local_name != NULL)) {
 929         local_name_len = strlen(local_name);
 930     }
 931 
 932     if (data == NULL) {
 933         data = "";
 934     }
 935 
 936     if (local_pid == 0) {
 937         local_pid = getpid();
 938     }
 939 
 940     if (sender == crm_msg_none) {
 941         sender = local_pid;
 942     }
 943 
 944     msg = calloc(1, sizeof(pcmk__cpg_msg_t));
 945 
 946     msg_id++;
 947     msg->id = msg_id;
 948     msg->header.id = msg_class;
 949     msg->header.error = CS_OK;
 950 
 951     msg->host.type = dest;
 952     msg->host.local = local;
 953 
 954     if (node) {
 955         if (node->uname) {
 956             target = strdup(node->uname);
 957             msg->host.size = strlen(node->uname);
 958             memset(msg->host.uname, 0, MAX_NAME);
 959             memcpy(msg->host.uname, node->uname, msg->host.size);
 960         } else {
 961             target = crm_strdup_printf("%u", node->id);
 962         }
 963         msg->host.id = node->id;
 964     } else {
 965         target = strdup("all");
 966     }
 967 
 968     msg->sender.id = 0;
 969     msg->sender.type = sender;
 970     msg->sender.pid = local_pid;
 971     msg->sender.size = local_name_len;
 972     memset(msg->sender.uname, 0, MAX_NAME);
 973     if ((local_name != NULL) && (msg->sender.size != 0)) {
 974         memcpy(msg->sender.uname, local_name, msg->sender.size);
 975     }
 976 
 977     msg->size = 1 + strlen(data);
 978     msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
 979 
 980     if (msg->size < CRM_BZ2_THRESHOLD) {
 981         msg = pcmk__realloc(msg, msg->header.size);
 982         memcpy(msg->data, data, msg->size);
 983 
 984     } else {
 985         char *compressed = NULL;
 986         unsigned int new_size = 0;
 987         char *uncompressed = strdup(data);
 988 
 989         if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
 990                            &compressed, &new_size) == pcmk_rc_ok) {
 991 
 992             msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
 993             msg = pcmk__realloc(msg, msg->header.size);
 994             memcpy(msg->data, compressed, new_size);
 995 
 996             msg->is_compressed = TRUE;
 997             msg->compressed_size = new_size;
 998 
 999         } else {
1000             // cppcheck seems not to understand the abort logic in pcmk__realloc
1001             // cppcheck-suppress memleak
1002             msg = pcmk__realloc(msg, msg->header.size);
1003             memcpy(msg->data, data, msg->size);
1004         }
1005 
1006         free(uncompressed);
1007         free(compressed);
1008     }
1009 
1010     iov = calloc(1, sizeof(struct iovec));
1011     iov->iov_base = msg;
1012     iov->iov_len = msg->header.size;
1013 
1014     if (msg->compressed_size) {
1015         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1016                   msg->id, target, (unsigned long long) iov->iov_len,
1017                   msg->compressed_size, data);
1018     } else {
1019         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1020                   msg->id, target, (unsigned long long) iov->iov_len,
1021                   msg->size, data);
1022     }
1023     free(target);
1024 
1025     cs_message_queue = g_list_append(cs_message_queue, iov);
1026     crm_cs_flush(&pcmk_cpg_handle);
1027 
1028     return TRUE;
1029 }
1030 
1031 /*!
1032  * \brief Get the message type equivalent of a string
1033  *
1034  * \param[in] text  String of message type
1035  *
1036  * \return Message type equivalent of \p text
1037  */
1038 enum crm_ais_msg_types
1039 text2msg_type(const char *text)
     /* [previous][next][first][last][top][bottom][index][help] */
1040 {
1041     int type = crm_msg_none;
1042 
1043     CRM_CHECK(text != NULL, return type);
1044     text = pcmk__message_name(text);
1045     if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1046         type = crm_msg_ais;
1047     } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1048         type = crm_msg_cib;
1049     } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1050         type = crm_msg_crmd;
1051     } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1052         type = crm_msg_te;
1053     } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1054         type = crm_msg_pe;
1055     } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1056         type = crm_msg_lrmd;
1057     } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1058         type = crm_msg_stonithd;
1059     } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1060         type = crm_msg_stonith_ng;
1061     } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1062         type = crm_msg_attrd;
1063 
1064     } else {
1065         /* This will normally be a transient client rather than
1066          * a cluster daemon.  Set the type to the pid of the client
1067          */
1068         int scan_rc = sscanf(text, "%d", &type);
1069 
1070         if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1071             /* Ensure it's sane */
1072             type = crm_msg_none;
1073         }
1074     }
1075     return type;
1076 }

/* [previous][next][first][last][top][bottom][index][help] */