root/lib/cluster/cpg.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. cluster_disconnect_cpg
  2. get_local_nodeid
  3. crm_cs_flush_cb
  4. crm_cs_flush
  5. send_cpg_iov
  6. pcmk_cpg_dispatch
  7. pcmk_message_common_cs
  8. cmp_member_list_nodeid
  9. cpgreason2str
  10. peer_name
  11. pcmk_cpg_membership
  12. cluster_connect_cpg
  13. send_cluster_message_cs
  14. send_cluster_text
  15. text2msg_type

   1 /*
   2  * Copyright 2004-2020 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 #include <bzlib.h>
  12 #include <sys/socket.h>
  13 #include <netinet/in.h>
  14 #include <arpa/inet.h>
  15 #include <netdb.h>
  16 
  17 #include <crm/common/ipc.h>
  18 #include <crm/cluster/internal.h>
  19 #include <crm/common/mainloop.h>
  20 #include <sys/utsname.h>
  21 
  22 #include <qb/qbipcc.h>
  23 #include <qb/qbutil.h>
  24 
  25 #include <corosync/corodefs.h>
  26 #include <corosync/corotypes.h>
  27 #include <corosync/hdb.h>
  28 #include <corosync/cpg.h>
  29 
  30 #include <crm/msg_xml.h>
  31 
  32 #include <crm/common/ipc_internal.h>  /* PCMK__SPECIAL_PID* */
  33 
  34 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
  35 
  36 static bool cpg_evicted = FALSE;
  37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
  38 
  39 #define cs_repeat(counter, max, code) do {              \
  40         code;                                           \
  41         if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {  \
  42             counter++;                                  \
  43             crm_debug("Retrying operation after %ds", counter); \
  44             sleep(counter);                             \
  45         } else {                                        \
  46             break;                                      \
  47         }                                               \
  48     } while(counter < max)
  49 
  50 void
  51 cluster_disconnect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
  52 {
  53     pcmk_cpg_handle = 0;
  54     if (cluster->cpg_handle) {
  55         crm_trace("Disconnecting CPG");
  56         cpg_leave(cluster->cpg_handle, &cluster->group);
  57         cpg_finalize(cluster->cpg_handle);
  58         cluster->cpg_handle = 0;
  59 
  60     } else {
  61         crm_info("No CPG connection");
  62     }
  63 }
  64 
  65 uint32_t get_local_nodeid(cpg_handle_t handle)
     /* [previous][next][first][last][top][bottom][index][help] */
  66 {
  67     cs_error_t rc = CS_OK;
  68     int retries = 0;
  69     static uint32_t local_nodeid = 0;
  70     cpg_handle_t local_handle = handle;
  71     cpg_callbacks_t cb = { };
  72     int fd = -1;
  73     uid_t found_uid = 0;
  74     gid_t found_gid = 0;
  75     pid_t found_pid = 0;
  76     int rv;
  77 
  78     if(local_nodeid != 0) {
  79         return local_nodeid;
  80     }
  81 
  82     if(handle == 0) {
  83         crm_trace("Creating connection");
  84         cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
  85         if (rc != CS_OK) {
  86             crm_err("Could not connect to the CPG API: %s (%d)",
  87                     cs_strerror(rc), rc);
  88             return 0;
  89         }
  90 
  91         rc = cpg_fd_get(local_handle, &fd);
  92         if (rc != CS_OK) {
  93             crm_err("Could not obtain the CPG API connection: %s (%d)",
  94                     cs_strerror(rc), rc);
  95             goto bail;
  96         }
  97 
  98         /* CPG provider run as root (in given user namespace, anyway)? */
  99         if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 100                                                 &found_uid, &found_gid))) {
 101             crm_err("CPG provider is not authentic:"
 102                     " process %lld (uid: %lld, gid: %lld)",
 103                     (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 104                     (long long) found_uid, (long long) found_gid);
 105             goto bail;
 106         } else if (rv < 0) {
 107             crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 108                     strerror(-rv), -rv);
 109             goto bail;
 110         }
 111     }
 112 
 113     if (rc == CS_OK) {
 114         retries = 0;
 115         crm_trace("Performing lookup");
 116         cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
 117     }
 118 
 119     if (rc != CS_OK) {
 120         crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
 121     }
 122 
 123 bail:
 124     if(handle == 0) {
 125         crm_trace("Closing connection");
 126         cpg_finalize(local_handle);
 127     }
 128     crm_debug("Local nodeid is %u", local_nodeid);
 129     return local_nodeid;
 130 }
 131 
 132 
 133 GListPtr cs_message_queue = NULL;
 134 int cs_message_timer = 0;
 135 
 136 static ssize_t crm_cs_flush(gpointer data);
 137 
 138 static gboolean
 139 crm_cs_flush_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 140 {
 141     cs_message_timer = 0;
 142     crm_cs_flush(data);
 143     return FALSE;
 144 }
 145 
 146 #define CS_SEND_MAX 200
 147 static ssize_t
 148 crm_cs_flush(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 149 {
 150     int sent = 0;
 151     ssize_t rc = 0;
 152     int queue_len = 0;
 153     static unsigned int last_sent = 0;
 154     cpg_handle_t *handle = (cpg_handle_t *)data;
 155 
 156     if (*handle == 0) {
 157         crm_trace("Connection is dead");
 158         return pcmk_ok;
 159     }
 160 
 161     queue_len = g_list_length(cs_message_queue);
 162     if ((queue_len % 1000) == 0 && queue_len > 1) {
 163         crm_err("CPG queue has grown to %d", queue_len);
 164 
 165     } else if (queue_len == CS_SEND_MAX) {
 166         crm_warn("CPG queue has grown to %d", queue_len);
 167     }
 168 
 169     if (cs_message_timer) {
 170         /* There is already a timer, wait until it goes off */
 171         crm_trace("Timer active %d", cs_message_timer);
 172         return pcmk_ok;
 173     }
 174 
 175     while (cs_message_queue && sent < CS_SEND_MAX) {
 176         struct iovec *iov = cs_message_queue->data;
 177 
 178         errno = 0;
 179         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
 180 
 181         if (rc != CS_OK) {
 182             break;
 183         }
 184 
 185         sent++;
 186         last_sent++;
 187         crm_trace("CPG message sent, size=%llu",
 188                   (unsigned long long) iov->iov_len);
 189 
 190         cs_message_queue = g_list_remove(cs_message_queue, iov);
 191         free(iov->iov_base);
 192         free(iov);
 193     }
 194 
 195     queue_len -= sent;
 196     if (sent > 1 || cs_message_queue) {
 197         crm_info("Sent %d CPG messages  (%d remaining, last=%u): %s (%lld)",
 198                  sent, queue_len, last_sent, ais_error2text(rc),
 199                  (long long) rc);
 200     } else {
 201         crm_trace("Sent %d CPG messages  (%d remaining, last=%u): %s (%lld)",
 202                   sent, queue_len, last_sent, ais_error2text(rc),
 203                   (long long) rc);
 204     }
 205 
 206     if (cs_message_queue) {
 207         uint32_t delay_ms = 100;
 208         if(rc != CS_OK) {
 209             /* Proportionally more if sending failed but cap at 1s */
 210             delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
 211         }
 212         cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
 213     }
 214 
 215     return rc;
 216 }
 217 
 218 gboolean
 219 send_cpg_iov(struct iovec * iov)
     /* [previous][next][first][last][top][bottom][index][help] */
 220 {
 221     static unsigned int queued = 0;
 222 
 223     queued++;
 224     crm_trace("Queueing CPG message %u (%llu bytes)",
 225               queued, (unsigned long long) iov->iov_len);
 226     cs_message_queue = g_list_append(cs_message_queue, iov);
 227     crm_cs_flush(&pcmk_cpg_handle);
 228     return TRUE;
 229 }
 230 
 231 static int
 232 pcmk_cpg_dispatch(gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
 233 {
 234     int rc = 0;
 235     crm_cluster_t *cluster = (crm_cluster_t*) user_data;
 236 
 237     rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
 238     if (rc != CS_OK) {
 239         crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
 240         cpg_finalize(cluster->cpg_handle);
 241         cluster->cpg_handle = 0;
 242         return -1;
 243 
 244     } else if(cpg_evicted) {
 245         crm_err("Evicted from CPG membership");
 246         return -1;
 247     }
 248     return 0;
 249 }
 250 
 251 char *
 252 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
     /* [previous][next][first][last][top][bottom][index][help] */
 253                         uint32_t *kind, const char **from)
 254 {
 255     char *data = NULL;
 256     AIS_Message *msg = (AIS_Message *) content;
 257 
 258     if(handle) {
 259         // Do filtering and field massaging
 260         uint32_t local_nodeid = get_local_nodeid(handle);
 261         const char *local_name = get_local_node_name();
 262 
 263         if (msg->sender.id > 0 && msg->sender.id != nodeid) {
 264             crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
 265             return NULL;
 266 
 267         } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
 268             /* Not for us */
 269             crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
 270             return NULL;
 271         } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
 272             /* Not for us */
 273             crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
 274             return NULL;
 275         }
 276 
 277         msg->sender.id = nodeid;
 278         if (msg->sender.size == 0) {
 279             crm_node_t *peer = crm_get_peer(nodeid, NULL);
 280 
 281             if (peer == NULL) {
 282                 crm_err("Peer with nodeid=%u is unknown", nodeid);
 283 
 284             } else if (peer->uname == NULL) {
 285                 crm_err("No uname for peer with nodeid=%u", nodeid);
 286 
 287             } else {
 288                 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
 289                 msg->sender.size = strlen(peer->uname);
 290                 memset(msg->sender.uname, 0, MAX_NAME);
 291                 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
 292             }
 293         }
 294     }
 295 
 296     crm_trace("Got new%s message (size=%d, %d, %d)",
 297               msg->is_compressed ? " compressed" : "",
 298               ais_data_len(msg), msg->size, msg->compressed_size);
 299 
 300     if (kind != NULL) {
 301         *kind = msg->header.id;
 302     }
 303     if (from != NULL) {
 304         *from = msg->sender.uname;
 305     }
 306 
 307     if (msg->is_compressed && msg->size > 0) {
 308         int rc = BZ_OK;
 309         char *uncompressed = NULL;
 310         unsigned int new_size = msg->size + 1;
 311 
 312         if (check_message_sanity(msg, NULL) == FALSE) {
 313             goto badmsg;
 314         }
 315 
 316         crm_trace("Decompressing message data");
 317         uncompressed = calloc(1, new_size);
 318         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
 319 
 320         if (rc != BZ_OK) {
 321             crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
 322                     bz2_strerror(rc), rc);
 323             free(uncompressed);
 324             goto badmsg;
 325         }
 326 
 327         CRM_ASSERT(rc == BZ_OK);
 328         CRM_ASSERT(new_size == msg->size);
 329 
 330         data = uncompressed;
 331 
 332     } else if (check_message_sanity(msg, data) == FALSE) {
 333         goto badmsg;
 334 
 335     } else if (pcmk__str_eq("identify", data, pcmk__str_casei)) {
 336         char *pid_s = pcmk__getpid_s();
 337 
 338         send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
 339         free(pid_s);
 340         return NULL;
 341 
 342     } else {
 343         data = strdup(msg->data);
 344     }
 345 
 346     // Is this necessary?
 347     crm_get_peer(msg->sender.id, msg->sender.uname);
 348 
 349     crm_trace("Payload: %.200s", data);
 350     return data;
 351 
 352   badmsg:
 353     crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
 354             " min=%d, total=%d, size=%d, bz2_size=%d",
 355             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
 356             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
 357             msg->sender.pid, (int)sizeof(AIS_Message),
 358             msg->header.size, msg->size, msg->compressed_size);
 359 
 360     free(data);
 361     return NULL;
 362 }
 363 
 364 static int cmp_member_list_nodeid(const void *first,
     /* [previous][next][first][last][top][bottom][index][help] */
 365                                   const void *second)
 366 {
 367     const struct cpg_address *const a = *((const struct cpg_address **) first),
 368                              *const b = *((const struct cpg_address **) second);
 369     if (a->nodeid < b->nodeid) {
 370         return -1;
 371     } else if (a->nodeid > b->nodeid) {
 372         return 1;
 373     }
 374     /* don't bother with "reason" nor "pid" */
 375     return 0;
 376 }
 377 
 378 static const char *
 379 cpgreason2str(cpg_reason_t reason)
     /* [previous][next][first][last][top][bottom][index][help] */
 380 {
 381     switch (reason) {
 382         case CPG_REASON_JOIN:       return " via cpg_join";
 383         case CPG_REASON_LEAVE:      return " via cpg_leave";
 384         case CPG_REASON_NODEDOWN:   return " via cluster exit";
 385         case CPG_REASON_NODEUP:     return " via cluster join";
 386         case CPG_REASON_PROCDOWN:   return " for unknown reason";
 387         default:                    break;
 388     }
 389     return "";
 390 }
 391 
 392 static inline const char *
 393 peer_name(crm_node_t *peer)
     /* [previous][next][first][last][top][bottom][index][help] */
 394 {
 395     if (peer == NULL) {
 396         return "unknown node";
 397     } else if (peer->uname == NULL) {
 398         return "peer node";
 399     } else {
 400         return peer->uname;
 401     }
 402 }
 403 
 404 void
 405 pcmk_cpg_membership(cpg_handle_t handle,
     /* [previous][next][first][last][top][bottom][index][help] */
 406                     const struct cpg_name *groupName,
 407                     const struct cpg_address *member_list, size_t member_list_entries,
 408                     const struct cpg_address *left_list, size_t left_list_entries,
 409                     const struct cpg_address *joined_list, size_t joined_list_entries)
 410 {
 411     int i;
 412     gboolean found = FALSE;
 413     static int counter = 0;
 414     uint32_t local_nodeid = get_local_nodeid(handle);
 415     const struct cpg_address *key, **sorted;
 416 
 417     sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
 418     CRM_ASSERT(sorted != NULL);
 419 
 420     for (size_t iter = 0; iter < member_list_entries; iter++) {
 421         sorted[iter] = member_list + iter;
 422     }
 423     /* so that the cross-matching multiply-subscribed nodes is then cheap */
 424     qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
 425           cmp_member_list_nodeid);
 426 
 427     for (i = 0; i < left_list_entries; i++) {
 428         crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
 429         const struct cpg_address **rival = NULL;
 430 
 431         /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
 432            and not playing by this rule may go wild in case of multiple
 433            residual instances of the same pacemaker daemon at the same node
 434            -- we must ensure that the possible local rival(s) won't make us
 435            cry out and bail (e.g. when they quit themselves), since all the
 436            surrounding logic denies this simple fact that the full membership
 437            is discriminated also per the PID of the process beside mere node
 438            ID (and implicitly, group ID); practically, this will be sound in
 439            terms of not preventing progress, since all the CPG joiners are
 440            also API end-point carriers, and that's what matters locally
 441            (who's the winner);
 442            remotely, we will just compare leave_list and member_list and if
 443            the left process has its node retained in member_list (under some
 444            other PID, anyway) we will just ignore it as well
 445            XXX: long-term fix is to establish in-out PID-aware tracking? */
 446         if (peer) {
 447             key = &left_list[i];
 448             rival = bsearch(&key, sorted, member_list_entries,
 449                             sizeof(const struct cpg_address *),
 450                             cmp_member_list_nodeid);
 451         }
 452 
 453         if (rival == NULL) {
 454             crm_info("Group %s event %d: %s (node %u pid %u) left%s",
 455                      groupName->value, counter, peer_name(peer),
 456                      left_list[i].nodeid, left_list[i].pid,
 457                      cpgreason2str(left_list[i].reason));
 458             if (peer) {
 459                 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 460                                      OFFLINESTATUS);
 461             }
 462         } else if (left_list[i].nodeid == local_nodeid) {
 463             crm_warn("Group %s event %d: duplicate local pid %u left%s",
 464                      groupName->value, counter,
 465                      left_list[i].pid, cpgreason2str(left_list[i].reason));
 466         } else {
 467             crm_warn("Group %s event %d: "
 468                      "%s (node %u) duplicate pid %u left%s (%u remains)",
 469                      groupName->value, counter, peer_name(peer),
 470                      left_list[i].nodeid, left_list[i].pid,
 471                      cpgreason2str(left_list[i].reason), (*rival)->pid);
 472         }
 473     }
 474     free(sorted);
 475     sorted = NULL;
 476 
 477     for (i = 0; i < joined_list_entries; i++) {
 478         crm_info("Group %s event %d: node %u pid %u joined%s",
 479                  groupName->value, counter, joined_list[i].nodeid,
 480                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
 481     }
 482 
 483     for (i = 0; i < member_list_entries; i++) {
 484         crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
 485 
 486         if (member_list[i].nodeid == local_nodeid
 487                 && member_list[i].pid != getpid()) {
 488             /* see the note above */
 489             crm_warn("Group %s event %d: detected duplicate local pid %u",
 490                      groupName->value, counter, member_list[i].pid);
 491             continue;
 492         }
 493         crm_info("Group %s event %d: %s (node %u pid %u) is member",
 494                  groupName->value, counter, peer_name(peer),
 495                  member_list[i].nodeid, member_list[i].pid);
 496 
 497         /* If the caller left auto-reaping enabled, this will also update the
 498          * state to member.
 499          */
 500         peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 501                                     ONLINESTATUS);
 502 
 503         if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
 504             /* The node is a CPG member, but we currently think it's not a
 505              * cluster member. This is possible only if auto-reaping was
 506              * disabled. The node may be joining, and we happened to get the CPG
 507              * notification before the quorum notification; or the node may have
 508              * just died, and we are processing its final messages; or a bug
 509              * has affected the peer cache.
 510              */
 511             time_t now = time(NULL);
 512 
 513             if (peer->when_lost == 0) {
 514                 // Track when we first got into this contradictory state
 515                 peer->when_lost = now;
 516 
 517             } else if (now > (peer->when_lost + 60)) {
 518                 // If it persists for more than a minute, update the state
 519                 crm_warn("Node %u is member of group %s but was believed offline",
 520                          member_list[i].nodeid, groupName->value);
 521                 crm_update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
 522             }
 523         }
 524 
 525         if (local_nodeid == member_list[i].nodeid) {
 526             found = TRUE;
 527         }
 528     }
 529 
 530     if (!found) {
 531         crm_err("Local node was evicted from group %s", groupName->value);
 532         cpg_evicted = TRUE;
 533     }
 534 
 535     counter++;
 536 }
 537 
 538 gboolean
 539 cluster_connect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
 540 {
 541     cs_error_t rc;
 542     int fd = -1;
 543     int retries = 0;
 544     uint32_t id = 0;
 545     crm_node_t *peer = NULL;
 546     cpg_handle_t handle = 0;
 547     const char *message_name = pcmk__message_name(crm_system_name);
 548     uid_t found_uid = 0;
 549     gid_t found_gid = 0;
 550     pid_t found_pid = 0;
 551     int rv;
 552 
 553     struct mainloop_fd_callbacks cpg_fd_callbacks = {
 554         .dispatch = pcmk_cpg_dispatch,
 555         .destroy = cluster->destroy,
 556     };
 557 
 558     cpg_callbacks_t cpg_callbacks = {
 559         .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
 560         .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
 561         /* .cpg_deliver_fn = pcmk_cpg_deliver, */
 562         /* .cpg_confchg_fn = pcmk_cpg_membership, */
 563     };
 564 
 565     cpg_evicted = FALSE;
 566     cluster->group.length = 0;
 567     cluster->group.value[0] = 0;
 568 
 569     /* group.value is char[128] */
 570     strncpy(cluster->group.value, message_name, 127);
 571     cluster->group.value[127] = 0;
 572     cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
 573 
 574     cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
 575     if (rc != CS_OK) {
 576         crm_err("Could not connect to the CPG API: %s (%d)",
 577                 cs_strerror(rc), rc);
 578         goto bail;
 579     }
 580 
 581     rc = cpg_fd_get(handle, &fd);
 582     if (rc != CS_OK) {
 583         crm_err("Could not obtain the CPG API connection: %s (%d)",
 584                 cs_strerror(rc), rc);
 585         goto bail;
 586     }
 587 
 588     /* CPG provider run as root (in given user namespace, anyway)? */
 589     if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 590                                             &found_uid, &found_gid))) {
 591         crm_err("CPG provider is not authentic:"
 592                 " process %lld (uid: %lld, gid: %lld)",
 593                 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 594                 (long long) found_uid, (long long) found_gid);
 595         rc = CS_ERR_ACCESS;
 596         goto bail;
 597     } else if (rv < 0) {
 598         crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 599                 strerror(-rv), -rv);
 600         rc = CS_ERR_ACCESS;
 601         goto bail;
 602     }
 603 
 604     id = get_local_nodeid(handle);
 605     if (id == 0) {
 606         crm_err("Could not get local node id from the CPG API");
 607         goto bail;
 608 
 609     }
 610     cluster->nodeid = id;
 611 
 612     retries = 0;
 613     cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
 614     if (rc != CS_OK) {
 615         crm_err("Could not join the CPG group '%s': %d", message_name, rc);
 616         goto bail;
 617     }
 618 
 619     pcmk_cpg_handle = handle;
 620     cluster->cpg_handle = handle;
 621     mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
 622 
 623   bail:
 624     if (rc != CS_OK) {
 625         cpg_finalize(handle);
 626         return FALSE;
 627     }
 628 
 629     peer = crm_get_peer(id, NULL);
 630     crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
 631     return TRUE;
 632 }
 633 
 634 gboolean
 635 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
     /* [previous][next][first][last][top][bottom][index][help] */
 636 {
 637     gboolean rc = TRUE;
 638     char *data = NULL;
 639 
 640     data = dump_xml_unformatted(msg);
 641     rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
 642     free(data);
 643     return rc;
 644 }
 645 
 646 gboolean
 647 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
     /* [previous][next][first][last][top][bottom][index][help] */
 648                   gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
 649 {
 650     static int msg_id = 0;
 651     static int local_pid = 0;
 652     static int local_name_len = 0;
 653     static const char *local_name = NULL;
 654 
 655     char *target = NULL;
 656     struct iovec *iov;
 657     AIS_Message *msg = NULL;
 658     enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
 659 
 660     switch (msg_class) {
 661         case crm_class_cluster:
 662             break;
 663         default:
 664             crm_err("Invalid message class: %d", msg_class);
 665             return FALSE;
 666     }
 667 
 668     CRM_CHECK(dest != crm_msg_ais, return FALSE);
 669 
 670     if(local_name == NULL) {
 671         local_name = get_local_node_name();
 672     }
 673     if(local_name_len == 0 && local_name) {
 674         local_name_len = strlen(local_name);
 675     }
 676 
 677     if (data == NULL) {
 678         data = "";
 679     }
 680 
 681     if (local_pid == 0) {
 682         local_pid = getpid();
 683     }
 684 
 685     if (sender == crm_msg_none) {
 686         sender = local_pid;
 687     }
 688 
 689     msg = calloc(1, sizeof(AIS_Message));
 690 
 691     msg_id++;
 692     msg->id = msg_id;
 693     msg->header.id = msg_class;
 694     msg->header.error = CS_OK;
 695 
 696     msg->host.type = dest;
 697     msg->host.local = local;
 698 
 699     if (node) {
 700         if (node->uname) {
 701             target = strdup(node->uname);
 702             msg->host.size = strlen(node->uname);
 703             memset(msg->host.uname, 0, MAX_NAME);
 704             memcpy(msg->host.uname, node->uname, msg->host.size);
 705         } else {
 706             target = crm_strdup_printf("%u", node->id);
 707         }
 708         msg->host.id = node->id;
 709     } else {
 710         target = strdup("all");
 711     }
 712 
 713     msg->sender.id = 0;
 714     msg->sender.type = sender;
 715     msg->sender.pid = local_pid;
 716     msg->sender.size = local_name_len;
 717     memset(msg->sender.uname, 0, MAX_NAME);
 718     if(local_name && msg->sender.size) {
 719         memcpy(msg->sender.uname, local_name, msg->sender.size);
 720     }
 721 
 722     msg->size = 1 + strlen(data);
 723     msg->header.size = sizeof(AIS_Message) + msg->size;
 724 
 725     if (msg->size < CRM_BZ2_THRESHOLD) {
 726         msg = pcmk__realloc(msg, msg->header.size);
 727         memcpy(msg->data, data, msg->size);
 728 
 729     } else {
 730         char *compressed = NULL;
 731         unsigned int new_size = 0;
 732         char *uncompressed = strdup(data);
 733 
 734         if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
 735                            &compressed, &new_size) == pcmk_rc_ok) {
 736 
 737             msg->header.size = sizeof(AIS_Message) + new_size;
 738             msg = pcmk__realloc(msg, msg->header.size);
 739             memcpy(msg->data, compressed, new_size);
 740 
 741             msg->is_compressed = TRUE;
 742             msg->compressed_size = new_size;
 743 
 744         } else {
 745             // cppcheck seems not to understand the abort logic in pcmk__realloc
 746             // cppcheck-suppress memleak
 747             msg = pcmk__realloc(msg, msg->header.size);
 748             memcpy(msg->data, data, msg->size);
 749         }
 750 
 751         free(uncompressed);
 752         free(compressed);
 753     }
 754 
 755     iov = calloc(1, sizeof(struct iovec));
 756     iov->iov_base = msg;
 757     iov->iov_len = msg->header.size;
 758 
 759     if (msg->compressed_size) {
 760         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
 761                   msg->id, target, (unsigned long long) iov->iov_len,
 762                   msg->compressed_size, data);
 763     } else {
 764         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
 765                   msg->id, target, (unsigned long long) iov->iov_len,
 766                   msg->size, data);
 767     }
 768     free(target);
 769 
 770     send_cpg_iov(iov);
 771 
 772     return TRUE;
 773 }
 774 
 775 enum crm_ais_msg_types
 776 text2msg_type(const char *text)
     /* [previous][next][first][last][top][bottom][index][help] */
 777 {
 778     int type = crm_msg_none;
 779 
 780     CRM_CHECK(text != NULL, return type);
 781     text = pcmk__message_name(text);
 782     if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
 783         type = crm_msg_ais;
 784     } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
 785         type = crm_msg_cib;
 786     } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
 787         type = crm_msg_crmd;
 788     } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
 789         type = crm_msg_te;
 790     } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
 791         type = crm_msg_pe;
 792     } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
 793         type = crm_msg_lrmd;
 794     } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
 795         type = crm_msg_stonithd;
 796     } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
 797         type = crm_msg_stonith_ng;
 798     } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
 799         type = crm_msg_attrd;
 800 
 801     } else {
 802         /* This will normally be a transient client rather than
 803          * a cluster daemon.  Set the type to the pid of the client
 804          */
 805         int scan_rc = sscanf(text, "%d", &type);
 806 
 807         if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
 808             /* Ensure it's sane */
 809             type = crm_msg_none;
 810         }
 811     }
 812     return type;
 813 }

/* [previous][next][first][last][top][bottom][index][help] */