This source file includes following definitions.
- cluster_disconnect_cpg
 
- get_local_nodeid
 
- crm_cs_flush_cb
 
- crm_cs_flush
 
- send_cpg_iov
 
- pcmk_cpg_dispatch
 
- pcmk_message_common_cs
 
- cmp_member_list_nodeid
 
- cpgreason2str
 
- peer_name
 
- pcmk_cpg_membership
 
- cluster_connect_cpg
 
- send_cluster_message_cs
 
- send_cluster_text
 
- text2msg_type
 
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 #include <crm_internal.h>
  11 #include <bzlib.h>
  12 #include <sys/socket.h>
  13 #include <netinet/in.h>
  14 #include <arpa/inet.h>
  15 #include <netdb.h>
  16 
  17 #include <crm/common/ipc.h>
  18 #include <crm/cluster/internal.h>
  19 #include <crm/common/mainloop.h>
  20 #include <sys/utsname.h>
  21 
  22 #include <qb/qbipcc.h>
  23 #include <qb/qbutil.h>
  24 
  25 #include <corosync/corodefs.h>
  26 #include <corosync/corotypes.h>
  27 #include <corosync/hdb.h>
  28 #include <corosync/cpg.h>
  29 
  30 #include <crm/msg_xml.h>
  31 
  32 #include <crm/common/ipc_internal.h>  
  33 
  34 cpg_handle_t pcmk_cpg_handle = 0; 
  35 
  36 static bool cpg_evicted = FALSE;
  37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
  38 
  39 #define cs_repeat(counter, max, code) do {              \
  40         code;                                           \
  41         if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {  \
  42             counter++;                                  \
  43             crm_debug("Retrying operation after %ds", counter); \
  44             sleep(counter);                             \
  45         } else {                                        \
  46             break;                                      \
  47         }                                               \
  48     } while(counter < max)
  49 
  50 void
  51 cluster_disconnect_cpg(crm_cluster_t *cluster)
     
  52 {
  53     pcmk_cpg_handle = 0;
  54     if (cluster->cpg_handle) {
  55         crm_trace("Disconnecting CPG");
  56         cpg_leave(cluster->cpg_handle, &cluster->group);
  57         cpg_finalize(cluster->cpg_handle);
  58         cluster->cpg_handle = 0;
  59 
  60     } else {
  61         crm_info("No CPG connection");
  62     }
  63 }
  64 
  65 uint32_t get_local_nodeid(cpg_handle_t handle)
     
  66 {
  67     cs_error_t rc = CS_OK;
  68     int retries = 0;
  69     static uint32_t local_nodeid = 0;
  70     cpg_handle_t local_handle = handle;
  71     cpg_callbacks_t cb = { };
  72     int fd = -1;
  73     uid_t found_uid = 0;
  74     gid_t found_gid = 0;
  75     pid_t found_pid = 0;
  76     int rv;
  77 
  78     if(local_nodeid != 0) {
  79         return local_nodeid;
  80     }
  81 
  82     if(handle == 0) {
  83         crm_trace("Creating connection");
  84         cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
  85         if (rc != CS_OK) {
  86             crm_err("Could not connect to the CPG API: %s (%d)",
  87                     cs_strerror(rc), rc);
  88             return 0;
  89         }
  90 
  91         rc = cpg_fd_get(local_handle, &fd);
  92         if (rc != CS_OK) {
  93             crm_err("Could not obtain the CPG API connection: %s (%d)",
  94                     cs_strerror(rc), rc);
  95             goto bail;
  96         }
  97 
  98         
  99         if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 100                                                 &found_uid, &found_gid))) {
 101             crm_err("CPG provider is not authentic:"
 102                     " process %lld (uid: %lld, gid: %lld)",
 103                     (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 104                     (long long) found_uid, (long long) found_gid);
 105             goto bail;
 106         } else if (rv < 0) {
 107             crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 108                     strerror(-rv), -rv);
 109             goto bail;
 110         }
 111     }
 112 
 113     if (rc == CS_OK) {
 114         retries = 0;
 115         crm_trace("Performing lookup");
 116         cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
 117     }
 118 
 119     if (rc != CS_OK) {
 120         crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
 121     }
 122 
 123 bail:
 124     if(handle == 0) {
 125         crm_trace("Closing connection");
 126         cpg_finalize(local_handle);
 127     }
 128     crm_debug("Local nodeid is %u", local_nodeid);
 129     return local_nodeid;
 130 }
 131 
 132 
 133 GListPtr cs_message_queue = NULL;
 134 int cs_message_timer = 0;
 135 
 136 static ssize_t crm_cs_flush(gpointer data);
 137 
 138 static gboolean
 139 crm_cs_flush_cb(gpointer data)
     
 140 {
 141     cs_message_timer = 0;
 142     crm_cs_flush(data);
 143     return FALSE;
 144 }
 145 
 146 #define CS_SEND_MAX 200
 147 static ssize_t
 148 crm_cs_flush(gpointer data)
     
 149 {
 150     int sent = 0;
 151     ssize_t rc = 0;
 152     int queue_len = 0;
 153     static unsigned int last_sent = 0;
 154     cpg_handle_t *handle = (cpg_handle_t *)data;
 155 
 156     if (*handle == 0) {
 157         crm_trace("Connection is dead");
 158         return pcmk_ok;
 159     }
 160 
 161     queue_len = g_list_length(cs_message_queue);
 162     if ((queue_len % 1000) == 0 && queue_len > 1) {
 163         crm_err("CPG queue has grown to %d", queue_len);
 164 
 165     } else if (queue_len == CS_SEND_MAX) {
 166         crm_warn("CPG queue has grown to %d", queue_len);
 167     }
 168 
 169     if (cs_message_timer) {
 170         
 171         crm_trace("Timer active %d", cs_message_timer);
 172         return pcmk_ok;
 173     }
 174 
 175     while (cs_message_queue && sent < CS_SEND_MAX) {
 176         struct iovec *iov = cs_message_queue->data;
 177 
 178         errno = 0;
 179         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
 180 
 181         if (rc != CS_OK) {
 182             break;
 183         }
 184 
 185         sent++;
 186         last_sent++;
 187         crm_trace("CPG message sent, size=%llu",
 188                   (unsigned long long) iov->iov_len);
 189 
 190         cs_message_queue = g_list_remove(cs_message_queue, iov);
 191         free(iov->iov_base);
 192         free(iov);
 193     }
 194 
 195     queue_len -= sent;
 196     if (sent > 1 || cs_message_queue) {
 197         crm_info("Sent %d CPG messages  (%d remaining, last=%u): %s (%lld)",
 198                  sent, queue_len, last_sent, ais_error2text(rc),
 199                  (long long) rc);
 200     } else {
 201         crm_trace("Sent %d CPG messages  (%d remaining, last=%u): %s (%lld)",
 202                   sent, queue_len, last_sent, ais_error2text(rc),
 203                   (long long) rc);
 204     }
 205 
 206     if (cs_message_queue) {
 207         uint32_t delay_ms = 100;
 208         if(rc != CS_OK) {
 209             
 210             delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
 211         }
 212         cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
 213     }
 214 
 215     return rc;
 216 }
 217 
 218 gboolean
 219 send_cpg_iov(struct iovec * iov)
     
 220 {
 221     static unsigned int queued = 0;
 222 
 223     queued++;
 224     crm_trace("Queueing CPG message %u (%llu bytes)",
 225               queued, (unsigned long long) iov->iov_len);
 226     cs_message_queue = g_list_append(cs_message_queue, iov);
 227     crm_cs_flush(&pcmk_cpg_handle);
 228     return TRUE;
 229 }
 230 
 231 static int
 232 pcmk_cpg_dispatch(gpointer user_data)
     
 233 {
 234     int rc = 0;
 235     crm_cluster_t *cluster = (crm_cluster_t*) user_data;
 236 
 237     rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
 238     if (rc != CS_OK) {
 239         crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
 240         cpg_finalize(cluster->cpg_handle);
 241         cluster->cpg_handle = 0;
 242         return -1;
 243 
 244     } else if(cpg_evicted) {
 245         crm_err("Evicted from CPG membership");
 246         return -1;
 247     }
 248     return 0;
 249 }
 250 
 251 char *
 252 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
     
 253                         uint32_t *kind, const char **from)
 254 {
 255     char *data = NULL;
 256     AIS_Message *msg = (AIS_Message *) content;
 257 
 258     if(handle) {
 259         
 260         uint32_t local_nodeid = get_local_nodeid(handle);
 261         const char *local_name = get_local_node_name();
 262 
 263         if (msg->sender.id > 0 && msg->sender.id != nodeid) {
 264             crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
 265             return NULL;
 266 
 267         } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
 268             
 269             crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
 270             return NULL;
 271         } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
 272             
 273             crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
 274             return NULL;
 275         }
 276 
 277         msg->sender.id = nodeid;
 278         if (msg->sender.size == 0) {
 279             crm_node_t *peer = crm_get_peer(nodeid, NULL);
 280 
 281             if (peer == NULL) {
 282                 crm_err("Peer with nodeid=%u is unknown", nodeid);
 283 
 284             } else if (peer->uname == NULL) {
 285                 crm_err("No uname for peer with nodeid=%u", nodeid);
 286 
 287             } else {
 288                 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
 289                 msg->sender.size = strlen(peer->uname);
 290                 memset(msg->sender.uname, 0, MAX_NAME);
 291                 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
 292             }
 293         }
 294     }
 295 
 296     crm_trace("Got new%s message (size=%d, %d, %d)",
 297               msg->is_compressed ? " compressed" : "",
 298               ais_data_len(msg), msg->size, msg->compressed_size);
 299 
 300     if (kind != NULL) {
 301         *kind = msg->header.id;
 302     }
 303     if (from != NULL) {
 304         *from = msg->sender.uname;
 305     }
 306 
 307     if (msg->is_compressed && msg->size > 0) {
 308         int rc = BZ_OK;
 309         char *uncompressed = NULL;
 310         unsigned int new_size = msg->size + 1;
 311 
 312         if (check_message_sanity(msg, NULL) == FALSE) {
 313             goto badmsg;
 314         }
 315 
 316         crm_trace("Decompressing message data");
 317         uncompressed = calloc(1, new_size);
 318         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
 319 
 320         if (rc != BZ_OK) {
 321             crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
 322                     bz2_strerror(rc), rc);
 323             free(uncompressed);
 324             goto badmsg;
 325         }
 326 
 327         CRM_ASSERT(rc == BZ_OK);
 328         CRM_ASSERT(new_size == msg->size);
 329 
 330         data = uncompressed;
 331 
 332     } else if (check_message_sanity(msg, data) == FALSE) {
 333         goto badmsg;
 334 
 335     } else if (pcmk__str_eq("identify", data, pcmk__str_casei)) {
 336         char *pid_s = pcmk__getpid_s();
 337 
 338         send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
 339         free(pid_s);
 340         return NULL;
 341 
 342     } else {
 343         data = strdup(msg->data);
 344     }
 345 
 346     
 347     crm_get_peer(msg->sender.id, msg->sender.uname);
 348 
 349     crm_trace("Payload: %.200s", data);
 350     return data;
 351 
 352   badmsg:
 353     crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
 354             " min=%d, total=%d, size=%d, bz2_size=%d",
 355             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
 356             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
 357             msg->sender.pid, (int)sizeof(AIS_Message),
 358             msg->header.size, msg->size, msg->compressed_size);
 359 
 360     free(data);
 361     return NULL;
 362 }
 363 
 364 static int cmp_member_list_nodeid(const void *first,
     
 365                                   const void *second)
 366 {
 367     const struct cpg_address *const a = *((const struct cpg_address **) first),
 368                              *const b = *((const struct cpg_address **) second);
 369     if (a->nodeid < b->nodeid) {
 370         return -1;
 371     } else if (a->nodeid > b->nodeid) {
 372         return 1;
 373     }
 374     
 375     return 0;
 376 }
 377 
 378 static const char *
 379 cpgreason2str(cpg_reason_t reason)
     
 380 {
 381     switch (reason) {
 382         case CPG_REASON_JOIN:       return " via cpg_join";
 383         case CPG_REASON_LEAVE:      return " via cpg_leave";
 384         case CPG_REASON_NODEDOWN:   return " via cluster exit";
 385         case CPG_REASON_NODEUP:     return " via cluster join";
 386         case CPG_REASON_PROCDOWN:   return " for unknown reason";
 387         default:                    break;
 388     }
 389     return "";
 390 }
 391 
 392 static inline const char *
 393 peer_name(crm_node_t *peer)
     
 394 {
 395     if (peer == NULL) {
 396         return "unknown node";
 397     } else if (peer->uname == NULL) {
 398         return "peer node";
 399     } else {
 400         return peer->uname;
 401     }
 402 }
 403 
 404 void
 405 pcmk_cpg_membership(cpg_handle_t handle,
     
 406                     const struct cpg_name *groupName,
 407                     const struct cpg_address *member_list, size_t member_list_entries,
 408                     const struct cpg_address *left_list, size_t left_list_entries,
 409                     const struct cpg_address *joined_list, size_t joined_list_entries)
 410 {
 411     int i;
 412     gboolean found = FALSE;
 413     static int counter = 0;
 414     uint32_t local_nodeid = get_local_nodeid(handle);
 415     const struct cpg_address *key, **sorted;
 416 
 417     sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
 418     CRM_ASSERT(sorted != NULL);
 419 
 420     for (size_t iter = 0; iter < member_list_entries; iter++) {
 421         sorted[iter] = member_list + iter;
 422     }
 423     
 424     qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
 425           cmp_member_list_nodeid);
 426 
 427     for (i = 0; i < left_list_entries; i++) {
 428         crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
 429         const struct cpg_address **rival = NULL;
 430 
 431         
 432 
 433 
 434 
 435 
 436 
 437 
 438 
 439 
 440 
 441 
 442 
 443 
 444 
 445 
 446         if (peer) {
 447             key = &left_list[i];
 448             rival = bsearch(&key, sorted, member_list_entries,
 449                             sizeof(const struct cpg_address *),
 450                             cmp_member_list_nodeid);
 451         }
 452 
 453         if (rival == NULL) {
 454             crm_info("Group %s event %d: %s (node %u pid %u) left%s",
 455                      groupName->value, counter, peer_name(peer),
 456                      left_list[i].nodeid, left_list[i].pid,
 457                      cpgreason2str(left_list[i].reason));
 458             if (peer) {
 459                 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 460                                      OFFLINESTATUS);
 461             }
 462         } else if (left_list[i].nodeid == local_nodeid) {
 463             crm_warn("Group %s event %d: duplicate local pid %u left%s",
 464                      groupName->value, counter,
 465                      left_list[i].pid, cpgreason2str(left_list[i].reason));
 466         } else {
 467             crm_warn("Group %s event %d: "
 468                      "%s (node %u) duplicate pid %u left%s (%u remains)",
 469                      groupName->value, counter, peer_name(peer),
 470                      left_list[i].nodeid, left_list[i].pid,
 471                      cpgreason2str(left_list[i].reason), (*rival)->pid);
 472         }
 473     }
 474     free(sorted);
 475     sorted = NULL;
 476 
 477     for (i = 0; i < joined_list_entries; i++) {
 478         crm_info("Group %s event %d: node %u pid %u joined%s",
 479                  groupName->value, counter, joined_list[i].nodeid,
 480                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
 481     }
 482 
 483     for (i = 0; i < member_list_entries; i++) {
 484         crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
 485 
 486         if (member_list[i].nodeid == local_nodeid
 487                 && member_list[i].pid != getpid()) {
 488             
 489             crm_warn("Group %s event %d: detected duplicate local pid %u",
 490                      groupName->value, counter, member_list[i].pid);
 491             continue;
 492         }
 493         crm_info("Group %s event %d: %s (node %u pid %u) is member",
 494                  groupName->value, counter, peer_name(peer),
 495                  member_list[i].nodeid, member_list[i].pid);
 496 
 497         
 498 
 499 
 500         peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
 501                                     ONLINESTATUS);
 502 
 503         if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
 504             
 505 
 506 
 507 
 508 
 509 
 510 
 511             time_t now = time(NULL);
 512 
 513             if (peer->when_lost == 0) {
 514                 
 515                 peer->when_lost = now;
 516 
 517             } else if (now > (peer->when_lost + 60)) {
 518                 
 519                 crm_warn("Node %u is member of group %s but was believed offline",
 520                          member_list[i].nodeid, groupName->value);
 521                 crm_update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
 522             }
 523         }
 524 
 525         if (local_nodeid == member_list[i].nodeid) {
 526             found = TRUE;
 527         }
 528     }
 529 
 530     if (!found) {
 531         crm_err("Local node was evicted from group %s", groupName->value);
 532         cpg_evicted = TRUE;
 533     }
 534 
 535     counter++;
 536 }
 537 
 538 gboolean
 539 cluster_connect_cpg(crm_cluster_t *cluster)
     
 540 {
 541     cs_error_t rc;
 542     int fd = -1;
 543     int retries = 0;
 544     uint32_t id = 0;
 545     crm_node_t *peer = NULL;
 546     cpg_handle_t handle = 0;
 547     const char *message_name = pcmk__message_name(crm_system_name);
 548     uid_t found_uid = 0;
 549     gid_t found_gid = 0;
 550     pid_t found_pid = 0;
 551     int rv;
 552 
 553     struct mainloop_fd_callbacks cpg_fd_callbacks = {
 554         .dispatch = pcmk_cpg_dispatch,
 555         .destroy = cluster->destroy,
 556     };
 557 
 558     cpg_callbacks_t cpg_callbacks = {
 559         .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
 560         .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
 561         
 562         
 563     };
 564 
 565     cpg_evicted = FALSE;
 566     cluster->group.length = 0;
 567     cluster->group.value[0] = 0;
 568 
 569     
 570     strncpy(cluster->group.value, message_name, 127);
 571     cluster->group.value[127] = 0;
 572     cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
 573 
 574     cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
 575     if (rc != CS_OK) {
 576         crm_err("Could not connect to the CPG API: %s (%d)",
 577                 cs_strerror(rc), rc);
 578         goto bail;
 579     }
 580 
 581     rc = cpg_fd_get(handle, &fd);
 582     if (rc != CS_OK) {
 583         crm_err("Could not obtain the CPG API connection: %s (%d)",
 584                 cs_strerror(rc), rc);
 585         goto bail;
 586     }
 587 
 588     
 589     if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
 590                                             &found_uid, &found_gid))) {
 591         crm_err("CPG provider is not authentic:"
 592                 " process %lld (uid: %lld, gid: %lld)",
 593                 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
 594                 (long long) found_uid, (long long) found_gid);
 595         rc = CS_ERR_ACCESS;
 596         goto bail;
 597     } else if (rv < 0) {
 598         crm_err("Could not verify authenticity of CPG provider: %s (%d)",
 599                 strerror(-rv), -rv);
 600         rc = CS_ERR_ACCESS;
 601         goto bail;
 602     }
 603 
 604     id = get_local_nodeid(handle);
 605     if (id == 0) {
 606         crm_err("Could not get local node id from the CPG API");
 607         goto bail;
 608 
 609     }
 610     cluster->nodeid = id;
 611 
 612     retries = 0;
 613     cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
 614     if (rc != CS_OK) {
 615         crm_err("Could not join the CPG group '%s': %d", message_name, rc);
 616         goto bail;
 617     }
 618 
 619     pcmk_cpg_handle = handle;
 620     cluster->cpg_handle = handle;
 621     mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
 622 
 623   bail:
 624     if (rc != CS_OK) {
 625         cpg_finalize(handle);
 626         return FALSE;
 627     }
 628 
 629     peer = crm_get_peer(id, NULL);
 630     crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
 631     return TRUE;
 632 }
 633 
 634 gboolean
 635 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
     
 636 {
 637     gboolean rc = TRUE;
 638     char *data = NULL;
 639 
 640     data = dump_xml_unformatted(msg);
 641     rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
 642     free(data);
 643     return rc;
 644 }
 645 
 646 gboolean
 647 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
     
 648                   gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
 649 {
 650     static int msg_id = 0;
 651     static int local_pid = 0;
 652     static int local_name_len = 0;
 653     static const char *local_name = NULL;
 654 
 655     char *target = NULL;
 656     struct iovec *iov;
 657     AIS_Message *msg = NULL;
 658     enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
 659 
 660     switch (msg_class) {
 661         case crm_class_cluster:
 662             break;
 663         default:
 664             crm_err("Invalid message class: %d", msg_class);
 665             return FALSE;
 666     }
 667 
 668     CRM_CHECK(dest != crm_msg_ais, return FALSE);
 669 
 670     if(local_name == NULL) {
 671         local_name = get_local_node_name();
 672     }
 673     if(local_name_len == 0 && local_name) {
 674         local_name_len = strlen(local_name);
 675     }
 676 
 677     if (data == NULL) {
 678         data = "";
 679     }
 680 
 681     if (local_pid == 0) {
 682         local_pid = getpid();
 683     }
 684 
 685     if (sender == crm_msg_none) {
 686         sender = local_pid;
 687     }
 688 
 689     msg = calloc(1, sizeof(AIS_Message));
 690 
 691     msg_id++;
 692     msg->id = msg_id;
 693     msg->header.id = msg_class;
 694     msg->header.error = CS_OK;
 695 
 696     msg->host.type = dest;
 697     msg->host.local = local;
 698 
 699     if (node) {
 700         if (node->uname) {
 701             target = strdup(node->uname);
 702             msg->host.size = strlen(node->uname);
 703             memset(msg->host.uname, 0, MAX_NAME);
 704             memcpy(msg->host.uname, node->uname, msg->host.size);
 705         } else {
 706             target = crm_strdup_printf("%u", node->id);
 707         }
 708         msg->host.id = node->id;
 709     } else {
 710         target = strdup("all");
 711     }
 712 
 713     msg->sender.id = 0;
 714     msg->sender.type = sender;
 715     msg->sender.pid = local_pid;
 716     msg->sender.size = local_name_len;
 717     memset(msg->sender.uname, 0, MAX_NAME);
 718     if(local_name && msg->sender.size) {
 719         memcpy(msg->sender.uname, local_name, msg->sender.size);
 720     }
 721 
 722     msg->size = 1 + strlen(data);
 723     msg->header.size = sizeof(AIS_Message) + msg->size;
 724 
 725     if (msg->size < CRM_BZ2_THRESHOLD) {
 726         msg = pcmk__realloc(msg, msg->header.size);
 727         memcpy(msg->data, data, msg->size);
 728 
 729     } else {
 730         char *compressed = NULL;
 731         unsigned int new_size = 0;
 732         char *uncompressed = strdup(data);
 733 
 734         if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
 735                            &compressed, &new_size) == pcmk_rc_ok) {
 736 
 737             msg->header.size = sizeof(AIS_Message) + new_size;
 738             msg = pcmk__realloc(msg, msg->header.size);
 739             memcpy(msg->data, compressed, new_size);
 740 
 741             msg->is_compressed = TRUE;
 742             msg->compressed_size = new_size;
 743 
 744         } else {
 745             
 746             
 747             msg = pcmk__realloc(msg, msg->header.size);
 748             memcpy(msg->data, data, msg->size);
 749         }
 750 
 751         free(uncompressed);
 752         free(compressed);
 753     }
 754 
 755     iov = calloc(1, sizeof(struct iovec));
 756     iov->iov_base = msg;
 757     iov->iov_len = msg->header.size;
 758 
 759     if (msg->compressed_size) {
 760         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
 761                   msg->id, target, (unsigned long long) iov->iov_len,
 762                   msg->compressed_size, data);
 763     } else {
 764         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
 765                   msg->id, target, (unsigned long long) iov->iov_len,
 766                   msg->size, data);
 767     }
 768     free(target);
 769 
 770     send_cpg_iov(iov);
 771 
 772     return TRUE;
 773 }
 774 
 775 enum crm_ais_msg_types
 776 text2msg_type(const char *text)
     
 777 {
 778     int type = crm_msg_none;
 779 
 780     CRM_CHECK(text != NULL, return type);
 781     text = pcmk__message_name(text);
 782     if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
 783         type = crm_msg_ais;
 784     } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
 785         type = crm_msg_cib;
 786     } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
 787         type = crm_msg_crmd;
 788     } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
 789         type = crm_msg_te;
 790     } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
 791         type = crm_msg_pe;
 792     } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
 793         type = crm_msg_lrmd;
 794     } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
 795         type = crm_msg_stonithd;
 796     } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
 797         type = crm_msg_stonith_ng;
 798     } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
 799         type = crm_msg_attrd;
 800 
 801     } else {
 802         
 803 
 804 
 805         int scan_rc = sscanf(text, "%d", &type);
 806 
 807         if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
 808             
 809             type = crm_msg_none;
 810         }
 811     }
 812     return type;
 813 }