root/lib/cluster/cpg.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. cluster_disconnect_cpg
  2. get_local_nodeid
  3. crm_cs_flush_cb
  4. crm_cs_flush
  5. pcmk_cpg_dispatch
  6. ais_dest
  7. msg_type2text
  8. check_message_sanity
  9. pcmk_message_common_cs
  10. cmp_member_list_nodeid
  11. cpgreason2str
  12. peer_name
  13. pcmk_cpg_membership
  14. cluster_connect_cpg
  15. pcmk__cpg_send_xml
  16. send_cluster_text
  17. text2msg_type

   1 /*
   2  * Copyright 2004-2020 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 #include <bzlib.h>
  12 #include <sys/socket.h>
  13 #include <netinet/in.h>
  14 #include <arpa/inet.h>
  15 #include <netdb.h>
  16 
  17 #include <crm/common/ipc.h>
  18 #include <crm/cluster/internal.h>
  19 #include <crm/common/mainloop.h>
  20 #include <sys/utsname.h>
  21 
  22 #include <qb/qbipc_common.h>
  23 #include <qb/qbipcc.h>
  24 #include <qb/qbutil.h>
  25 
  26 #include <corosync/corodefs.h>
  27 #include <corosync/corotypes.h>
  28 #include <corosync/hdb.h>
  29 #include <corosync/cpg.h>
  30 
  31 #include <crm/msg_xml.h>
  32 
  33 #include <crm/common/ipc_internal.h>  /* PCMK__SPECIAL_PID* */
  34 #include "crmcluster_private.h"
  35 
  36 /* @TODO Once we can update the public API to require crm_cluster_t* in more
  37  *       functions, we can ditch this in favor of cluster->cpg_handle.
  38  */
  39 static cpg_handle_t pcmk_cpg_handle = 0;
  40 
  41 // @TODO These could be moved to crm_cluster_t* at that time as well
  42 static bool cpg_evicted = false;
  43 static GList *cs_message_queue = NULL;
  44 static int cs_message_timer = 0;
  45 
  46 struct pcmk__cpg_host_s {
  47     uint32_t id;
  48     uint32_t pid;
  49     gboolean local;
  50     enum crm_ais_msg_types type;
  51     uint32_t size;
  52     char uname[MAX_NAME];
  53 } __attribute__ ((packed));
  54 
  55 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
  56 
  57 struct pcmk__cpg_msg_s {
  58     struct qb_ipc_response_header header __attribute__ ((aligned(8)));
  59     uint32_t id;
  60     gboolean is_compressed;
  61 
  62     pcmk__cpg_host_t host;
  63     pcmk__cpg_host_t sender;
  64 
  65     uint32_t size;
  66     uint32_t compressed_size;
  67     /* 584 bytes */
  68     char data[0];
  69 
  70 } __attribute__ ((packed));
  71 
  72 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
  73 
  74 static void crm_cs_flush(gpointer data);
  75 
  76 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
  77 
  78 #define cs_repeat(rc, counter, max, code) do {                          \
  79         rc = code;                                                      \
  80         if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) {    \
  81             counter++;                                                  \
  82             crm_debug("Retrying operation after %ds", counter);         \
  83             sleep(counter);                                             \
  84         } else {                                                        \
  85             break;                                                      \
  86         }                                                               \
  87     } while (counter < max)
  88 
  89 /*!
  90  * \brief Disconnect from Corosync CPG
  91  *
  92  * \param[in] Cluster to disconnect
  93  */
  94 void
  95 cluster_disconnect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
  96 {
  97     pcmk_cpg_handle = 0;
  98     if (cluster->cpg_handle) {
  99         crm_trace("Disconnecting CPG");
 100         cpg_leave(cluster->cpg_handle, &cluster->group);
 101         cpg_finalize(cluster->cpg_handle);
 102         cluster->cpg_handle = 0;
 103 
 104     } else {
 105         crm_info("No CPG connection");
 106     }
 107 }
 108 
 109 /*!
 110  * \brief Get the local Corosync node ID (via CPG)
 111  *
 112  * \param[in] handle  CPG connection to use (or 0 to use new connection)
 113  *
 114  * \return Corosync ID of local node (or 0 if not known)
 115  */
 116 uint32_t
 117 get_local_nodeid(cpg_handle_t handle)
     /* [previous][next][first][last][top][bottom][index][help] */
 118 {
 119     cs_error_t rc = CS_OK;
 120     int retries = 0;
 121     static uint32_t local_nodeid = 0;
 122     cpg_handle_t local_handle = handle;
 123     cpg_callbacks_t cb = { };
 124     int fd = -1;
 125     uid_t found_uid = 0;
 126     gid_t found_gid = 0;
 127     pid_t found_pid = 0;
 128     int rv;
 129 
 130     if(local_nodeid != 0) {
 131         return local_nodeid;
 132     }
 133 
 134     if(handle == 0) {
 135         crm_trace("Creating connection");
 136         cs_repeat(rc, retries, 5, cpg_initialize(&local_handle, &cb));
 137         if (rc != CS_OK) {
 138             crm_err("Could not connect to the CPG API: %s (%d)",
 139                     cs_strerror(rc), rc);
 140             return 0;
 141         }
 142 
 143         rc = cpg_fd_get(local_handle, &fd);
 144         if (rc != CS_OK) {
 145             crm_err("Could not obtain the CPG API connection: %s (%d)",
 146                     cs_strerror(rc), rc);
 147             goto bail;
 148         }
 149 
 150         /* CPG provider run as root (in given user namespace, anyway)? */
 151         if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 152                                                 &found_uid, &found_gid))) {
 153             crm_err("CPG provider is not authentic:"
 154                     " process %lld (uid: %lld, gid: %lld)",
 155                     (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 156                     (long long) found_uid, (long long) found_gid);
 157             goto bail;
 158         } else if (rv < 0) {
 159             crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 160                     strerror(-rv), -rv);
 161             goto bail;
 162         }
 163     }
 164 
 165     if (rc == CS_OK) {
 166         retries = 0;
 167         crm_trace("Performing lookup");
 168         cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
 169     }
 170 
 171     if (rc != CS_OK) {
 172         crm_err("Could not get local node id from the CPG API: %s (%d)",
 173                 pcmk__cs_err_str(rc), rc);
 174     }
 175 
 176 bail:
 177     if(handle == 0) {
 178         crm_trace("Closing connection");
 179         cpg_finalize(local_handle);
 180     }
 181     crm_debug("Local nodeid is %u", local_nodeid);
 182     return local_nodeid;
 183 }
 184 
 185 /*!
 186  * \internal
 187  * \brief Callback function for Corosync message queue timer
 188  *
 189  * \param[in] data  CPG handle
 190  *
 191  * \return FALSE (to indicate to glib that timer should not be removed)
 192  */
 193 static gboolean
 194 crm_cs_flush_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 195 {
 196     cs_message_timer = 0;
 197     crm_cs_flush(data);
 198     return FALSE;
 199 }
 200 
 201 // Send no more than this many CPG messages in one flush
 202 #define CS_SEND_MAX 200
 203 
 204 /*!
 205  * \internal
 206  * \brief Send messages in Corosync CPG message queue
 207  *
 208  * \param[in] data   CPG handle
 209  */
 210 static void
 211 crm_cs_flush(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 212 {
 213     unsigned int sent = 0;
 214     guint queue_len = 0;
 215     cs_error_t rc = 0;
 216     cpg_handle_t *handle = (cpg_handle_t *) data;
 217 
 218     if (*handle == 0) {
 219         crm_trace("Connection is dead");
 220         return;
 221     }
 222 
 223     queue_len = g_list_length(cs_message_queue);
 224     if (((queue_len % 1000) == 0) && (queue_len > 1)) {
 225         crm_err("CPG queue has grown to %d", queue_len);
 226 
 227     } else if (queue_len == CS_SEND_MAX) {
 228         crm_warn("CPG queue has grown to %d", queue_len);
 229     }
 230 
 231     if (cs_message_timer != 0) {
 232         /* There is already a timer, wait until it goes off */
 233         crm_trace("Timer active %d", cs_message_timer);
 234         return;
 235     }
 236 
 237     while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
 238         struct iovec *iov = cs_message_queue->data;
 239 
 240         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
 241         if (rc != CS_OK) {
 242             break;
 243         }
 244 
 245         sent++;
 246         crm_trace("CPG message sent, size=%llu",
 247                   (unsigned long long) iov->iov_len);
 248 
 249         cs_message_queue = g_list_remove(cs_message_queue, iov);
 250         free(iov->iov_base);
 251         free(iov);
 252     }
 253 
 254     queue_len -= sent;
 255     if ((sent > 1) || (cs_message_queue != NULL)) {
 256         crm_info("Sent %u CPG messages  (%d remaining): %s (%d)",
 257                  sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
 258     } else {
 259         crm_trace("Sent %u CPG messages  (%d remaining): %s (%d)",
 260                   sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
 261     }
 262 
 263     if (cs_message_queue) {
 264         uint32_t delay_ms = 100;
 265         if (rc != CS_OK) {
 266             /* Proportionally more if sending failed but cap at 1s */
 267             delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
 268         }
 269         cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
 270     }
 271 }
 272 
 273 /*!
 274  * \internal
 275  * \brief Dispatch function for CPG handle
 276  *
 277  * \param[in] user_data  Cluster object
 278  *
 279  * \return 0 on success, -1 on error (per mainloop_io_t interface)
 280  */
 281 static int
 282 pcmk_cpg_dispatch(gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
 283 {
 284     cs_error_t rc = CS_OK;
 285     crm_cluster_t *cluster = (crm_cluster_t *) user_data;
 286 
 287     rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
 288     if (rc != CS_OK) {
 289         crm_err("Connection to the CPG API failed: %s (%d)",
 290                 pcmk__cs_err_str(rc), rc);
 291         cpg_finalize(cluster->cpg_handle);
 292         cluster->cpg_handle = 0;
 293         return -1;
 294 
 295     } else if (cpg_evicted) {
 296         crm_err("Evicted from CPG membership");
 297         return -1;
 298     }
 299     return 0;
 300 }
 301 
 302 static inline const char *
 303 ais_dest(const pcmk__cpg_host_t *host)
     /* [previous][next][first][last][top][bottom][index][help] */
 304 {
 305     if (host->local) {
 306         return "local";
 307     } else if (host->size > 0) {
 308         return host->uname;
 309     } else {
 310         return "<all>";
 311     }
 312 }
 313 
 314 static inline const char *
 315 msg_type2text(enum crm_ais_msg_types type)
     /* [previous][next][first][last][top][bottom][index][help] */
 316 {
 317     const char *text = "unknown";
 318 
 319     switch (type) {
 320         case crm_msg_none:
 321             text = "unknown";
 322             break;
 323         case crm_msg_ais:
 324             text = "ais";
 325             break;
 326         case crm_msg_cib:
 327             text = "cib";
 328             break;
 329         case crm_msg_crmd:
 330             text = "crmd";
 331             break;
 332         case crm_msg_pe:
 333             text = "pengine";
 334             break;
 335         case crm_msg_te:
 336             text = "tengine";
 337             break;
 338         case crm_msg_lrmd:
 339             text = "lrmd";
 340             break;
 341         case crm_msg_attrd:
 342             text = "attrd";
 343             break;
 344         case crm_msg_stonithd:
 345             text = "stonithd";
 346             break;
 347         case crm_msg_stonith_ng:
 348             text = "stonith-ng";
 349             break;
 350     }
 351     return text;
 352 }
 353 
 354 /*!
 355  * \internal
 356  * \brief Check whether a Corosync CPG message is valid
 357  *
 358  * \param[in] msg   Corosync CPG message to check
 359  *
 360  * \return true if \p msg is valid, otherwise false
 361  */
 362 static bool
 363 check_message_sanity(const pcmk__cpg_msg_t *msg)
     /* [previous][next][first][last][top][bottom][index][help] */
 364 {
 365     int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
 366 
 367     if (payload_size < 1) {
 368         crm_err("%sCPG message %d from %s invalid: "
 369                 "Claimed size of %d bytes is too small "
 370                 CRM_XS " from %s[%u] to %s@%s",
 371                 (msg->is_compressed? "Compressed " : ""),
 372                 msg->id, ais_dest(&(msg->sender)),
 373                 (int) msg->header.size,
 374                 msg_type2text(msg->sender.type), msg->sender.pid,
 375                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 376         return false;
 377     }
 378 
 379     if (msg->header.error != CS_OK) {
 380         crm_err("%sCPG message %d from %s invalid: "
 381                 "Sender indicated error %d "
 382                 CRM_XS " from %s[%u] to %s@%s",
 383                 (msg->is_compressed? "Compressed " : ""),
 384                 msg->id, ais_dest(&(msg->sender)),
 385                 msg->header.error,
 386                 msg_type2text(msg->sender.type), msg->sender.pid,
 387                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 388         return false;
 389     }
 390 
 391     if (msg_data_len(msg) != payload_size) {
 392         crm_err("%sCPG message %d from %s invalid: "
 393                 "Total size %d inconsistent with payload size %d "
 394                 CRM_XS " from %s[%u] to %s@%s",
 395                 (msg->is_compressed? "Compressed " : ""),
 396                 msg->id, ais_dest(&(msg->sender)),
 397                 (int) msg->header.size, (int) msg_data_len(msg),
 398                 msg_type2text(msg->sender.type), msg->sender.pid,
 399                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 400         return false;
 401     }
 402 
 403     if (!msg->is_compressed &&
 404         /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
 405          * but checking the last byte or two should be quick
 406          */
 407         (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
 408          || (msg->data[msg->size - 1] != '\0'))) {
 409         crm_err("CPG message %d from %s invalid: "
 410                 "Payload does not end at byte %llu "
 411                 CRM_XS " from %s[%u] to %s@%s",
 412                 msg->id, ais_dest(&(msg->sender)),
 413                 (unsigned long long) msg->size,
 414                 msg_type2text(msg->sender.type), msg->sender.pid,
 415                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 416         return false;
 417     }
 418 
 419     crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
 420               (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
 421               msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
 422               ais_dest(&(msg->sender)),
 423               msg_type2text(msg->host.type), ais_dest(&(msg->host)));
 424     return true;
 425 }
 426 
 427 /*!
 428  * \brief Extract text data from a Corosync CPG message
 429  *
 430  * \param[in]  handle   CPG connection (to get local node ID if not yet known)
 431  * \param[in]  nodeid   Corosync ID of node that sent message
 432  * \param[in]  pid      Process ID of message sender (for logging only)
 433  * \param[in]  content  CPG message
 434  * \param[out] kind     If not NULL, will be set to CPG header ID
 435  *                      (which should be an enum crm_ais_msg_class value,
 436  *                      currently always crm_class_cluster)
 437  * \param[out] from     If not NULL, will be set to sender uname
 438  *                      (valid for the lifetime of \p content)
 439  *
 440  * \return Newly allocated string with message data
 441  * \note It is the caller's responsibility to free the return value with free().
 442  */
 443 char *
 444 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
     /* [previous][next][first][last][top][bottom][index][help] */
 445                         uint32_t *kind, const char **from)
 446 {
 447     char *data = NULL;
 448     pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
 449 
 450     if(handle) {
 451         // Do filtering and field massaging
 452         uint32_t local_nodeid = get_local_nodeid(handle);
 453         const char *local_name = get_local_node_name();
 454 
 455         if (msg->sender.id > 0 && msg->sender.id != nodeid) {
 456             crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
 457             return NULL;
 458 
 459         } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
 460             /* Not for us */
 461             crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
 462             return NULL;
 463         } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
 464             /* Not for us */
 465             crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
 466             return NULL;
 467         }
 468 
 469         msg->sender.id = nodeid;
 470         if (msg->sender.size == 0) {
 471             crm_node_t *peer = crm_get_peer(nodeid, NULL);
 472 
 473             if (peer == NULL) {
 474                 crm_err("Peer with nodeid=%u is unknown", nodeid);
 475 
 476             } else if (peer->uname == NULL) {
 477                 crm_err("No uname for peer with nodeid=%u", nodeid);
 478 
 479             } else {
 480                 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
 481                 msg->sender.size = strlen(peer->uname);
 482                 memset(msg->sender.uname, 0, MAX_NAME);
 483                 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
 484             }
 485         }
 486     }
 487 
 488     crm_trace("Got new%s message (size=%d, %d, %d)",
 489               msg->is_compressed ? " compressed" : "",
 490               msg_data_len(msg), msg->size, msg->compressed_size);
 491 
 492     if (kind != NULL) {
 493         *kind = msg->header.id;
 494     }
 495     if (from != NULL) {
 496         *from = msg->sender.uname;
 497     }
 498 
 499     if (msg->is_compressed && msg->size > 0) {
 500         int rc = BZ_OK;
 501         char *uncompressed = NULL;
 502         unsigned int new_size = msg->size + 1;
 503 
 504         if (!check_message_sanity(msg)) {
 505             goto badmsg;
 506         }
 507 
 508         crm_trace("Decompressing message data");
 509         uncompressed = calloc(1, new_size);
 510         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
 511 
 512         if (rc != BZ_OK) {
 513             crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
 514                     bz2_strerror(rc), rc);
 515             free(uncompressed);
 516             goto badmsg;
 517         }
 518 
 519         CRM_ASSERT(rc == BZ_OK);
 520         CRM_ASSERT(new_size == msg->size);
 521 
 522         data = uncompressed;
 523 
 524     } else if (!check_message_sanity(msg)) {
 525         goto badmsg;
 526 
 527     } else {
 528         data = strdup(msg->data);
 529     }
 530 
 531     // Is this necessary?
 532     crm_get_peer(msg->sender.id, msg->sender.uname);
 533 
 534     crm_trace("Payload: %.200s", data);
 535     return data;
 536 
 537   badmsg:
 538     crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
 539             " min=%d, total=%d, size=%d, bz2_size=%d",
 540             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
 541             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
 542             msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
 543             msg->header.size, msg->size, msg->compressed_size);
 544 
 545     free(data);
 546     return NULL;
 547 }
 548 
 549 /*!
 550  * \internal
 551  * \brief Compare cpg_address objects by node ID
 552  *
 553  * \param[in] first   First cpg_address structure to compare
 554  * \param[in] second  Second cpg_address structure to compare
 555  *
 556  * \return Negative number if first's node ID is lower,
 557  *         positive number if first's node ID is greater,
 558  *         or 0 if both node IDs are equal
 559  */
 560 static int
 561 cmp_member_list_nodeid(const void *first, const void *second)
     /* [previous][next][first][last][top][bottom][index][help] */
 562 {
 563     const struct cpg_address *const a = *((const struct cpg_address **) first),
 564                              *const b = *((const struct cpg_address **) second);
 565     if (a->nodeid < b->nodeid) {
 566         return -1;
 567     } else if (a->nodeid > b->nodeid) {
 568         return 1;
 569     }
 570     /* don't bother with "reason" nor "pid" */
 571     return 0;
 572 }
 573 
 574 /*!
 575  * \internal
 576  * \brief Get a readable string equivalent of a cpg_reason_t value
 577  *
 578  * \param[in] reason  CPG reason value
 579  *
 580  * \return Readable string suitable for logging
 581  */
 582 static const char *
 583 cpgreason2str(cpg_reason_t reason)
     /* [previous][next][first][last][top][bottom][index][help] */
 584 {
 585     switch (reason) {
 586         case CPG_REASON_JOIN:       return " via cpg_join";
 587         case CPG_REASON_LEAVE:      return " via cpg_leave";
 588         case CPG_REASON_NODEDOWN:   return " via cluster exit";
 589         case CPG_REASON_NODEUP:     return " via cluster join";
 590         case CPG_REASON_PROCDOWN:   return " for unknown reason";
 591         default:                    break;
 592     }
 593     return "";
 594 }
 595 
 596 /*!
 597  * \internal
 598  * \brief Get a log-friendly node name
 599  *
 600  * \param[in] peer  Node to check
 601  *
 602  * \return Node's uname, or readable string if not known
 603  */
 604 static inline const char *
 605 peer_name(crm_node_t *peer)
     /* [previous][next][first][last][top][bottom][index][help] */
 606 {
 607     if (peer == NULL) {
 608         return "unknown node";
 609     } else if (peer->uname == NULL) {
 610         return "peer node";
 611     } else {
 612         return peer->uname;
 613     }
 614 }
 615 
 616 /*!
 617  * \brief Handle a CPG configuration change event
 618  *
 619  * \param[in] handle               CPG connection
 620  * \param[in] cpg_name             CPG group name
 621  * \param[in] member_list          List of current CPG members
 622  * \param[in] member_list_entries  Number of entries in \p member_list
 623  * \param[in] left_list            List of CPG members that left
 624  * \param[in] left_list_entries    Number of entries in \p left_list
 625  * \param[in] joined_list          List of CPG members that joined
 626  * \param[in] joined_list_entries  Number of entries in \p joined_list
 627  */
 628 void
 629 pcmk_cpg_membership(cpg_handle_t handle,
     /* [previous][next][first][last][top][bottom][index][help] */
 630                     const struct cpg_name *groupName,
 631                     const struct cpg_address *member_list, size_t member_list_entries,
 632                     const struct cpg_address *left_list, size_t left_list_entries,
 633                     const struct cpg_address *joined_list, size_t joined_list_entries)
 634 {
 635     int i;
 636     gboolean found = FALSE;
 637     static int counter = 0;
 638     uint32_t local_nodeid = get_local_nodeid(handle);
 639     const struct cpg_address *key, **sorted;
 640 
 641     sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
 642     CRM_ASSERT(sorted != NULL);
 643 
 644     for (size_t iter = 0; iter < member_list_entries; iter++) {
 645         sorted[iter] = member_list + iter;
 646     }
 647     /* so that the cross-matching multiply-subscribed nodes is then cheap */
 648     qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
 649           cmp_member_list_nodeid);
 650 
 651     for (i = 0; i < left_list_entries; i++) {
 652         crm_node_t *peer = pcmk__search_cluster_node_cache(left_list[i].nodeid,
 653                                                            NULL);
 654         const struct cpg_address **rival = NULL;
 655 
 656         /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
 657            and not playing by this rule may go wild in case of multiple
 658            residual instances of the same pacemaker daemon at the same node
 659            -- we must ensure that the possible local rival(s) won't make us
 660            cry out and bail (e.g. when they quit themselves), since all the
 661            surrounding logic denies this simple fact that the full membership
 662            is discriminated also per the PID of the process beside mere node
 663            ID (and implicitly, group ID); practically, this will be sound in
 664            terms of not preventing progress, since all the CPG joiners are
 665            also API end-point carriers, and that's what matters locally
 666            (who's the winner);
 667            remotely, we will just compare leave_list and member_list and if
 668            the left process has its node retained in member_list (under some
 669            other PID, anyway) we will just ignore it as well
 670            XXX: long-term fix is to establish in-out PID-aware tracking? */
 671         if (peer) {
 672             key = &left_list[i];
 673             rival = bsearch(&key, sorted, member_list_entries,
 674                             sizeof(const struct cpg_address *),
 675                             cmp_member_list_nodeid);
 676         }
 677 
 678         if (rival == NULL) {
 679             crm_info("Group %s event %d: %s (node %u pid %u) left%s",
 680                      groupName->value, counter, peer_name(peer),
 681                      left_list[i].nodeid, left_list[i].pid,
 682                      cpgreason2str(left_list[i].reason));
 683             if (peer) {
 684                 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 685                                      OFFLINESTATUS);
 686             }
 687         } else if (left_list[i].nodeid == local_nodeid) {
 688             crm_warn("Group %s event %d: duplicate local pid %u left%s",
 689                      groupName->value, counter,
 690                      left_list[i].pid, cpgreason2str(left_list[i].reason));
 691         } else {
 692             crm_warn("Group %s event %d: "
 693                      "%s (node %u) duplicate pid %u left%s (%u remains)",
 694                      groupName->value, counter, peer_name(peer),
 695                      left_list[i].nodeid, left_list[i].pid,
 696                      cpgreason2str(left_list[i].reason), (*rival)->pid);
 697         }
 698     }
 699     free(sorted);
 700     sorted = NULL;
 701 
 702     for (i = 0; i < joined_list_entries; i++) {
 703         crm_info("Group %s event %d: node %u pid %u joined%s",
 704                  groupName->value, counter, joined_list[i].nodeid,
 705                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
 706     }
 707 
 708     for (i = 0; i < member_list_entries; i++) {
 709         crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
 710 
 711         if (member_list[i].nodeid == local_nodeid
 712                 && member_list[i].pid != getpid()) {
 713             /* see the note above */
 714             crm_warn("Group %s event %d: detected duplicate local pid %u",
 715                      groupName->value, counter, member_list[i].pid);
 716             continue;
 717         }
 718         crm_info("Group %s event %d: %s (node %u pid %u) is member",
 719                  groupName->value, counter, peer_name(peer),
 720                  member_list[i].nodeid, member_list[i].pid);
 721 
 722         /* If the caller left auto-reaping enabled, this will also update the
 723          * state to member.
 724          */
 725         peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 726                                     ONLINESTATUS);
 727 
 728         if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
 729             /* The node is a CPG member, but we currently think it's not a
 730              * cluster member. This is possible only if auto-reaping was
 731              * disabled. The node may be joining, and we happened to get the CPG
 732              * notification before the quorum notification; or the node may have
 733              * just died, and we are processing its final messages; or a bug
 734              * has affected the peer cache.
 735              */
 736             time_t now = time(NULL);
 737 
 738             if (peer->when_lost == 0) {
 739                 // Track when we first got into this contradictory state
 740                 peer->when_lost = now;
 741 
 742             } else if (now > (peer->when_lost + 60)) {
 743                 // If it persists for more than a minute, update the state
 744                 crm_warn("Node %u is member of group %s but was believed offline",
 745                          member_list[i].nodeid, groupName->value);
 746                 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
 747             }
 748         }
 749 
 750         if (local_nodeid == member_list[i].nodeid) {
 751             found = TRUE;
 752         }
 753     }
 754 
 755     if (!found) {
 756         crm_err("Local node was evicted from group %s", groupName->value);
 757         cpg_evicted = true;
 758     }
 759 
 760     counter++;
 761 }
 762 
 763 /*!
 764  * \brief Connect to Corosync CPG
 765  *
 766  * \param[in] cluster  Cluster object
 767  *
 768  * \return TRUE on success, otherwise FALSE
 769  */
 770 gboolean
 771 cluster_connect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
 772 {
 773     cs_error_t rc;
 774     int fd = -1;
 775     int retries = 0;
 776     uint32_t id = 0;
 777     crm_node_t *peer = NULL;
 778     cpg_handle_t handle = 0;
 779     const char *message_name = pcmk__message_name(crm_system_name);
 780     uid_t found_uid = 0;
 781     gid_t found_gid = 0;
 782     pid_t found_pid = 0;
 783     int rv;
 784 
 785     struct mainloop_fd_callbacks cpg_fd_callbacks = {
 786         .dispatch = pcmk_cpg_dispatch,
 787         .destroy = cluster->destroy,
 788     };
 789 
 790     cpg_callbacks_t cpg_callbacks = {
 791         .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
 792         .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
 793         /* .cpg_deliver_fn = pcmk_cpg_deliver, */
 794         /* .cpg_confchg_fn = pcmk_cpg_membership, */
 795     };
 796 
 797     cpg_evicted = false;
 798     cluster->group.length = 0;
 799     cluster->group.value[0] = 0;
 800 
 801     /* group.value is char[128] */
 802     strncpy(cluster->group.value, message_name, 127);
 803     cluster->group.value[127] = 0;
 804     cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
 805 
 806     cs_repeat(rc, retries, 30, cpg_initialize(&handle, &cpg_callbacks));
 807     if (rc != CS_OK) {
 808         crm_err("Could not connect to the CPG API: %s (%d)",
 809                 cs_strerror(rc), rc);
 810         goto bail;
 811     }
 812 
 813     rc = cpg_fd_get(handle, &fd);
 814     if (rc != CS_OK) {
 815         crm_err("Could not obtain the CPG API connection: %s (%d)",
 816                 cs_strerror(rc), rc);
 817         goto bail;
 818     }
 819 
 820     /* CPG provider run as root (in given user namespace, anyway)? */
 821     if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 822                                             &found_uid, &found_gid))) {
 823         crm_err("CPG provider is not authentic:"
 824                 " process %lld (uid: %lld, gid: %lld)",
 825                 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 826                 (long long) found_uid, (long long) found_gid);
 827         rc = CS_ERR_ACCESS;
 828         goto bail;
 829     } else if (rv < 0) {
 830         crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 831                 strerror(-rv), -rv);
 832         rc = CS_ERR_ACCESS;
 833         goto bail;
 834     }
 835 
 836     id = get_local_nodeid(handle);
 837     if (id == 0) {
 838         crm_err("Could not get local node id from the CPG API");
 839         goto bail;
 840 
 841     }
 842     cluster->nodeid = id;
 843 
 844     retries = 0;
 845     cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
 846     if (rc != CS_OK) {
 847         crm_err("Could not join the CPG group '%s': %d", message_name, rc);
 848         goto bail;
 849     }
 850 
 851     pcmk_cpg_handle = handle;
 852     cluster->cpg_handle = handle;
 853     mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
 854 
 855   bail:
 856     if (rc != CS_OK) {
 857         cpg_finalize(handle);
 858         return FALSE;
 859     }
 860 
 861     peer = crm_get_peer(id, NULL);
 862     crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
 863     return TRUE;
 864 }
 865 
 866 /*!
 867  * \internal
 868  * \brief Send an XML message via Corosync CPG
 869  *
 870  * \param[in] msg   XML message to send
 871  * \param[in] node  Cluster node to send message to
 872  * \param[in] dest  Type of message to send
 873  *
 874  * \return TRUE on success, otherwise FALSE
 875  */
 876 gboolean
 877 pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
     /* [previous][next][first][last][top][bottom][index][help] */
 878 {
 879     gboolean rc = TRUE;
 880     char *data = NULL;
 881 
 882     data = dump_xml_unformatted(msg);
 883     rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
 884     free(data);
 885     return rc;
 886 }
 887 
 888 /*!
 889  * \internal
 890  * \brief Send string data via Corosync CPG
 891  *
 892  * \param[in] msg_class  Message class (to set as CPG header ID)
 893  * \param[in] data       Data to send
 894  * \param[in] local      What to set as host "local" value (which is never used)
 895  * \param[in] node       Cluster node to send message to
 896  * \param[in] dest       Type of message to send
 897  *
 898  * \return TRUE on success, otherwise FALSE
 899  */
 900 gboolean
 901 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
     /* [previous][next][first][last][top][bottom][index][help] */
 902                   gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
 903 {
 904     static int msg_id = 0;
 905     static int local_pid = 0;
 906     static int local_name_len = 0;
 907     static const char *local_name = NULL;
 908 
 909     char *target = NULL;
 910     struct iovec *iov;
 911     pcmk__cpg_msg_t *msg = NULL;
 912     enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
 913 
 914     switch (msg_class) {
 915         case crm_class_cluster:
 916             break;
 917         default:
 918             crm_err("Invalid message class: %d", msg_class);
 919             return FALSE;
 920     }
 921 
 922     CRM_CHECK(dest != crm_msg_ais, return FALSE);
 923 
 924     if (local_name == NULL) {
 925         local_name = get_local_node_name();
 926     }
 927     if ((local_name_len == 0) && (local_name != NULL)) {
 928         local_name_len = strlen(local_name);
 929     }
 930 
 931     if (data == NULL) {
 932         data = "";
 933     }
 934 
 935     if (local_pid == 0) {
 936         local_pid = getpid();
 937     }
 938 
 939     if (sender == crm_msg_none) {
 940         sender = local_pid;
 941     }
 942 
 943     msg = calloc(1, sizeof(pcmk__cpg_msg_t));
 944 
 945     msg_id++;
 946     msg->id = msg_id;
 947     msg->header.id = msg_class;
 948     msg->header.error = CS_OK;
 949 
 950     msg->host.type = dest;
 951     msg->host.local = local;
 952 
 953     if (node) {
 954         if (node->uname) {
 955             target = strdup(node->uname);
 956             msg->host.size = strlen(node->uname);
 957             memset(msg->host.uname, 0, MAX_NAME);
 958             memcpy(msg->host.uname, node->uname, msg->host.size);
 959         } else {
 960             target = crm_strdup_printf("%u", node->id);
 961         }
 962         msg->host.id = node->id;
 963     } else {
 964         target = strdup("all");
 965     }
 966 
 967     msg->sender.id = 0;
 968     msg->sender.type = sender;
 969     msg->sender.pid = local_pid;
 970     msg->sender.size = local_name_len;
 971     memset(msg->sender.uname, 0, MAX_NAME);
 972     if ((local_name != NULL) && (msg->sender.size != 0)) {
 973         memcpy(msg->sender.uname, local_name, msg->sender.size);
 974     }
 975 
 976     msg->size = 1 + strlen(data);
 977     msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
 978 
 979     if (msg->size < CRM_BZ2_THRESHOLD) {
 980         msg = pcmk__realloc(msg, msg->header.size);
 981         memcpy(msg->data, data, msg->size);
 982 
 983     } else {
 984         char *compressed = NULL;
 985         unsigned int new_size = 0;
 986         char *uncompressed = strdup(data);
 987 
 988         if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
 989                            &compressed, &new_size) == pcmk_rc_ok) {
 990 
 991             msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
 992             msg = pcmk__realloc(msg, msg->header.size);
 993             memcpy(msg->data, compressed, new_size);
 994 
 995             msg->is_compressed = TRUE;
 996             msg->compressed_size = new_size;
 997 
 998         } else {
 999             // cppcheck seems not to understand the abort logic in pcmk__realloc
1000             // cppcheck-suppress memleak
1001             msg = pcmk__realloc(msg, msg->header.size);
1002             memcpy(msg->data, data, msg->size);
1003         }
1004 
1005         free(uncompressed);
1006         free(compressed);
1007     }
1008 
1009     iov = calloc(1, sizeof(struct iovec));
1010     iov->iov_base = msg;
1011     iov->iov_len = msg->header.size;
1012 
1013     if (msg->compressed_size) {
1014         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1015                   msg->id, target, (unsigned long long) iov->iov_len,
1016                   msg->compressed_size, data);
1017     } else {
1018         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1019                   msg->id, target, (unsigned long long) iov->iov_len,
1020                   msg->size, data);
1021     }
1022     free(target);
1023 
1024     cs_message_queue = g_list_append(cs_message_queue, iov);
1025     crm_cs_flush(&pcmk_cpg_handle);
1026 
1027     return TRUE;
1028 }
1029 
1030 /*!
1031  * \brief Get the message type equivalent of a string
1032  *
1033  * \param[in] text  String of message type
1034  *
1035  * \return Message type equivalent of \p text
1036  */
1037 enum crm_ais_msg_types
1038 text2msg_type(const char *text)
     /* [previous][next][first][last][top][bottom][index][help] */
1039 {
1040     int type = crm_msg_none;
1041 
1042     CRM_CHECK(text != NULL, return type);
1043     text = pcmk__message_name(text);
1044     if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1045         type = crm_msg_ais;
1046     } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1047         type = crm_msg_cib;
1048     } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1049         type = crm_msg_crmd;
1050     } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1051         type = crm_msg_te;
1052     } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1053         type = crm_msg_pe;
1054     } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1055         type = crm_msg_lrmd;
1056     } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1057         type = crm_msg_stonithd;
1058     } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1059         type = crm_msg_stonith_ng;
1060     } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1061         type = crm_msg_attrd;
1062 
1063     } else {
1064         /* This will normally be a transient client rather than
1065          * a cluster daemon.  Set the type to the pid of the client
1066          */
1067         int scan_rc = sscanf(text, "%d", &type);
1068 
1069         if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1070             /* Ensure it's sane */
1071             type = crm_msg_none;
1072         }
1073     }
1074     return type;
1075 }

/* [previous][next][first][last][top][bottom][index][help] */