root/lib/cluster/cpg.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. cluster_disconnect_cpg
  2. get_local_nodeid
  3. crm_cs_flush_cb
  4. crm_cs_flush
  5. send_cpg_iov
  6. pcmk_cpg_dispatch
  7. pcmk_message_common_cs
  8. pcmk_cpg_membership
  9. cluster_connect_cpg
  10. send_cluster_message_cs
  11. send_cluster_text
  12. text2msg_type

   1 /*
   2  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
   3  *
   4  * This library is free software; you can redistribute it and/or
   5  * modify it under the terms of the GNU Lesser General Public
   6  * License as published by the Free Software Foundation; either
   7  * version 2.1 of the License, or (at your option) any later version.
   8  *
   9  * This library is distributed in the hope that it will be useful,
  10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  12  * Lesser General Public License for more details.
  13  *
  14  * You should have received a copy of the GNU Lesser General Public
  15  * License along with this library; if not, write to the Free Software
  16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  17  */
  18 
  19 #include <crm_internal.h>
  20 #include <bzlib.h>
  21 #include <sys/socket.h>
  22 #include <netinet/in.h>
  23 #include <arpa/inet.h>
  24 #include <netdb.h>
  25 
  26 #include <crm/common/ipc.h>
  27 #include <crm/cluster/internal.h>
  28 #include <crm/common/mainloop.h>
  29 #include <sys/utsname.h>
  30 
  31 #include <qb/qbipcc.h>
  32 #include <qb/qbutil.h>
  33 
  34 #include <corosync/corodefs.h>
  35 #include <corosync/corotypes.h>
  36 #include <corosync/hdb.h>
  37 #include <corosync/cpg.h>
  38 
  39 #include <crm/msg_xml.h>
  40 
  41 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
  42 
  43 static bool cpg_evicted = FALSE;
  44 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
  45 
  46 #define cs_repeat(counter, max, code) do {              \
  47         code;                                           \
  48         if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {  \
  49             counter++;                                  \
  50             crm_debug("Retrying operation after %ds", counter); \
  51             sleep(counter);                             \
  52         } else {                                        \
  53             break;                                      \
  54         }                                               \
  55     } while(counter < max)
  56 
  57 void
  58 cluster_disconnect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
  59 {
  60     pcmk_cpg_handle = 0;
  61     if (cluster->cpg_handle) {
  62         crm_trace("Disconnecting CPG");
  63         cpg_leave(cluster->cpg_handle, &cluster->group);
  64         cpg_finalize(cluster->cpg_handle);
  65         cluster->cpg_handle = 0;
  66 
  67     } else {
  68         crm_info("No CPG connection");
  69     }
  70 }
  71 
  72 uint32_t get_local_nodeid(cpg_handle_t handle)
     /* [previous][next][first][last][top][bottom][index][help] */
  73 {
  74     int rc = CS_OK;
  75     int retries = 0;
  76     static uint32_t local_nodeid = 0;
  77     cpg_handle_t local_handle = handle;
  78     cpg_callbacks_t cb = { };
  79 
  80     if(local_nodeid != 0) {
  81         return local_nodeid;
  82     }
  83 
  84 #if 0
  85     /* Should not be necessary */
  86     if(get_cluster_type() == pcmk_cluster_classic_ais) {
  87         get_ais_details(&local_nodeid, NULL);
  88         goto done;
  89     }
  90 #endif
  91 
  92     if(handle == 0) {
  93         crm_trace("Creating connection");
  94         cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
  95     }
  96 
  97     if (rc == CS_OK) {
  98         retries = 0;
  99         crm_trace("Performing lookup");
 100         cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
 101     }
 102 
 103     if (rc != CS_OK) {
 104         crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
 105     }
 106     if(handle == 0) {
 107         crm_trace("Closing connection");
 108         cpg_finalize(local_handle);
 109     }
 110     crm_debug("Local nodeid is %u", local_nodeid);
 111     return local_nodeid;
 112 }
 113 
 114 
 115 GListPtr cs_message_queue = NULL;
 116 int cs_message_timer = 0;
 117 
 118 static ssize_t crm_cs_flush(gpointer data);
 119 
 120 static gboolean
 121 crm_cs_flush_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 122 {
 123     cs_message_timer = 0;
 124     crm_cs_flush(data);
 125     return FALSE;
 126 }
 127 
 128 #define CS_SEND_MAX 200
 129 static ssize_t
 130 crm_cs_flush(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 131 {
 132     int sent = 0;
 133     ssize_t rc = 0;
 134     int queue_len = 0;
 135     static unsigned int last_sent = 0;
 136     cpg_handle_t *handle = (cpg_handle_t *)data;
 137 
 138     if (*handle == 0) {
 139         crm_trace("Connection is dead");
 140         return pcmk_ok;
 141     }
 142 
 143     queue_len = g_list_length(cs_message_queue);
 144     if ((queue_len % 1000) == 0 && queue_len > 1) {
 145         crm_err("CPG queue has grown to %d", queue_len);
 146 
 147     } else if (queue_len == CS_SEND_MAX) {
 148         crm_warn("CPG queue has grown to %d", queue_len);
 149     }
 150 
 151     if (cs_message_timer) {
 152         /* There is already a timer, wait until it goes off */
 153         crm_trace("Timer active %d", cs_message_timer);
 154         return pcmk_ok;
 155     }
 156 
 157     while (cs_message_queue && sent < CS_SEND_MAX) {
 158         struct iovec *iov = cs_message_queue->data;
 159 
 160         errno = 0;
 161         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
 162 
 163         if (rc != CS_OK) {
 164             break;
 165         }
 166 
 167         sent++;
 168         last_sent++;
 169         crm_trace("CPG message sent, size=%llu",
 170                   (unsigned long long) iov->iov_len);
 171 
 172         cs_message_queue = g_list_remove(cs_message_queue, iov);
 173         free(iov->iov_base);
 174         free(iov);
 175     }
 176 
 177     queue_len -= sent;
 178     if (sent > 1 || cs_message_queue) {
 179         crm_info("Sent %d CPG messages  (%d remaining, last=%u): %s (%lld)",
 180                  sent, queue_len, last_sent, ais_error2text(rc),
 181                  (long long) rc);
 182     } else {
 183         crm_trace("Sent %d CPG messages  (%d remaining, last=%u): %s (%lld)",
 184                   sent, queue_len, last_sent, ais_error2text(rc),
 185                   (long long) rc);
 186     }
 187 
 188     if (cs_message_queue) {
 189         uint32_t delay_ms = 100;
 190         if(rc != CS_OK) {
 191             /* Proportionally more if sending failed but cap at 1s */
 192             delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
 193         }
 194         cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
 195     }
 196 
 197     return rc;
 198 }
 199 
 200 gboolean
 201 send_cpg_iov(struct iovec * iov)
     /* [previous][next][first][last][top][bottom][index][help] */
 202 {
 203     static unsigned int queued = 0;
 204 
 205     queued++;
 206     crm_trace("Queueing CPG message %u (%llu bytes)",
 207               queued, (unsigned long long) iov->iov_len);
 208     cs_message_queue = g_list_append(cs_message_queue, iov);
 209     crm_cs_flush(&pcmk_cpg_handle);
 210     return TRUE;
 211 }
 212 
 213 static int
 214 pcmk_cpg_dispatch(gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
 215 {
 216     int rc = 0;
 217     crm_cluster_t *cluster = (crm_cluster_t*) user_data;
 218 
 219     rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
 220     if (rc != CS_OK) {
 221         crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
 222         cluster->cpg_handle = 0;
 223         return -1;
 224 
 225     } else if(cpg_evicted) {
 226         crm_err("Evicted from CPG membership");
 227         return -1;
 228     }
 229     return 0;
 230 }
 231 
 232 char *
 233 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
     /* [previous][next][first][last][top][bottom][index][help] */
 234                         uint32_t *kind, const char **from)
 235 {
 236     char *data = NULL;
 237     AIS_Message *msg = (AIS_Message *) content;
 238 
 239     if(handle) {
 240         /* 'msg' came from CPG not the plugin
 241          * Do filtering and field massaging
 242          */
 243         uint32_t local_nodeid = get_local_nodeid(handle);
 244         const char *local_name = get_local_node_name();
 245 
 246         if (msg->sender.id > 0 && msg->sender.id != nodeid) {
 247             crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
 248             return NULL;
 249 
 250         } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
 251             /* Not for us */
 252             crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
 253             return NULL;
 254         } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
 255             /* Not for us */
 256             crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
 257             return NULL;
 258         }
 259 
 260         msg->sender.id = nodeid;
 261         if (msg->sender.size == 0) {
 262             crm_node_t *peer = crm_get_peer(nodeid, NULL);
 263 
 264             if (peer == NULL) {
 265                 crm_err("Peer with nodeid=%u is unknown", nodeid);
 266 
 267             } else if (peer->uname == NULL) {
 268                 crm_err("No uname for peer with nodeid=%u", nodeid);
 269 
 270             } else {
 271                 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
 272                 msg->sender.size = strlen(peer->uname);
 273                 memset(msg->sender.uname, 0, MAX_NAME);
 274                 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
 275             }
 276         }
 277     }
 278 
 279     crm_trace("Got new%s message (size=%d, %d, %d)",
 280               msg->is_compressed ? " compressed" : "",
 281               ais_data_len(msg), msg->size, msg->compressed_size);
 282 
 283     if (kind != NULL) {
 284         *kind = msg->header.id;
 285     }
 286     if (from != NULL) {
 287         *from = msg->sender.uname;
 288     }
 289 
 290     if (msg->is_compressed && msg->size > 0) {
 291         int rc = BZ_OK;
 292         char *uncompressed = NULL;
 293         unsigned int new_size = msg->size + 1;
 294 
 295         if (check_message_sanity(msg, NULL) == FALSE) {
 296             goto badmsg;
 297         }
 298 
 299         crm_trace("Decompressing message data");
 300         uncompressed = calloc(1, new_size);
 301         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
 302 
 303         if (rc != BZ_OK) {
 304             crm_err("Decompression failed: %d", rc);
 305             free(uncompressed);
 306             goto badmsg;
 307         }
 308 
 309         CRM_ASSERT(rc == BZ_OK);
 310         CRM_ASSERT(new_size == msg->size);
 311 
 312         data = uncompressed;
 313 
 314     } else if (check_message_sanity(msg, data) == FALSE) {
 315         goto badmsg;
 316 
 317     } else if (safe_str_eq("identify", data)) {
 318         char *pid_s = crm_getpid_s();
 319 
 320         send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
 321         free(pid_s);
 322         return NULL;
 323 
 324     } else {
 325         data = strdup(msg->data);
 326     }
 327 
 328     if (msg->header.id != crm_class_members) {
 329         /* Is this even needed anymore? */
 330         crm_get_peer(msg->sender.id, msg->sender.uname);
 331     }
 332 
 333     if (msg->header.id == crm_class_rmpeer) {
 334         uint32_t id = crm_int_helper(data, NULL);
 335 
 336         crm_info("Removing peer %s/%u", data, id);
 337         reap_crm_member(id, NULL);
 338         free(data);
 339         return NULL;
 340 
 341 #if SUPPORT_PLUGIN
 342     } else if (is_classic_ais_cluster()) {
 343         plugin_handle_membership(msg);
 344 #endif
 345     }
 346 
 347     crm_trace("Payload: %.200s", data);
 348     return data;
 349 
 350   badmsg:
 351     crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
 352             " min=%d, total=%d, size=%d, bz2_size=%d",
 353             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
 354             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
 355             msg->sender.pid, (int)sizeof(AIS_Message),
 356             msg->header.size, msg->size, msg->compressed_size);
 357 
 358     free(data);
 359     return NULL;
 360 }
 361 
 362 void
 363 pcmk_cpg_membership(cpg_handle_t handle,
     /* [previous][next][first][last][top][bottom][index][help] */
 364                     const struct cpg_name *groupName,
 365                     const struct cpg_address *member_list, size_t member_list_entries,
 366                     const struct cpg_address *left_list, size_t left_list_entries,
 367                     const struct cpg_address *joined_list, size_t joined_list_entries)
 368 {
 369     int i;
 370     gboolean found = FALSE;
 371     static int counter = 0;
 372     uint32_t local_nodeid = get_local_nodeid(handle);
 373 
 374     for (i = 0; i < left_list_entries; i++) {
 375         crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
 376 
 377         crm_info("Node %u left group %s (peer=%s, counter=%d.%d)",
 378                  left_list[i].nodeid, groupName->value,
 379                  (peer? peer->uname : "<none>"), counter, i);
 380         if (peer) {
 381             crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
 382         }
 383     }
 384 
 385     for (i = 0; i < joined_list_entries; i++) {
 386         crm_info("Node %u joined group %s (counter=%d.%d)",
 387                  joined_list[i].nodeid, groupName->value, counter, i);
 388     }
 389 
 390     for (i = 0; i < member_list_entries; i++) {
 391         crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
 392 
 393         crm_info("Node %u still member of group %s (peer=%s, counter=%d.%d)",
 394                  member_list[i].nodeid, groupName->value,
 395                  (peer? peer->uname : "<none>"), counter, i);
 396 
 397         /* Anyone that is sending us CPG messages must also be a _CPG_ member.
 398          * But it's _not_ safe to assume it's in the quorum membership.
 399          * We may have just found out it's dead and are processing the last couple of messages it sent
 400          */
 401         peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
 402         if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
 403             time_t now = time(NULL);
 404 
 405             /* Co-opt the otherwise unused votes field */
 406             if(peer->votes == 0) {
 407                 peer->votes = now;
 408 
 409             } else if(now > (60 + peer->votes)) {
 410                 /* On the otherhand, if we're still getting messages, at a certain point
 411                  * we need to acknowledge our internal cache is probably wrong
 412                  *
 413                  * Set the threshold to 1 minute
 414                  */
 415                 crm_err("Node %s[%u] appears to be online even though we think it is dead", peer->uname, peer->id);
 416                 if (crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0)) {
 417                     peer->votes = 0;
 418                 }
 419             }
 420         }
 421 
 422         if (local_nodeid == member_list[i].nodeid) {
 423             found = TRUE;
 424         }
 425     }
 426 
 427     if (!found) {
 428         crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
 429         cpg_evicted = TRUE;
 430     }
 431 
 432     counter++;
 433 }
 434 
 435 gboolean
 436 cluster_connect_cpg(crm_cluster_t *cluster)
     /* [previous][next][first][last][top][bottom][index][help] */
 437 {
 438     int rc = -1;
 439     int fd = 0;
 440     int retries = 0;
 441     uint32_t id = 0;
 442     crm_node_t *peer = NULL;
 443     cpg_handle_t handle = 0;
 444 
 445     struct mainloop_fd_callbacks cpg_fd_callbacks = {
 446         .dispatch = pcmk_cpg_dispatch,
 447         .destroy = cluster->destroy,
 448     };
 449 
 450     cpg_callbacks_t cpg_callbacks = {
 451         .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
 452         .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
 453         /* .cpg_deliver_fn = pcmk_cpg_deliver, */
 454         /* .cpg_confchg_fn = pcmk_cpg_membership, */
 455     };
 456 
 457     cpg_evicted = FALSE;
 458     cluster->group.length = 0;
 459     cluster->group.value[0] = 0;
 460 
 461     /* group.value is char[128] */
 462     strncpy(cluster->group.value, crm_system_name?crm_system_name:"unknown", 127);
 463     cluster->group.value[127] = 0;
 464     cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
 465 
 466     cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
 467     if (rc != CS_OK) {
 468         crm_err("Could not connect to the Cluster Process Group API: %d", rc);
 469         goto bail;
 470     }
 471 
 472     id = get_local_nodeid(handle);
 473     if (id == 0) {
 474         crm_err("Could not get local node id from the CPG API");
 475         goto bail;
 476 
 477     }
 478     cluster->nodeid = id;
 479 
 480     retries = 0;
 481     cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
 482     if (rc != CS_OK) {
 483         crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
 484         goto bail;
 485     }
 486 
 487     rc = cpg_fd_get(handle, &fd);
 488     if (rc != CS_OK) {
 489         crm_err("Could not obtain the CPG API connection: %d", rc);
 490         goto bail;
 491     }
 492 
 493     pcmk_cpg_handle = handle;
 494     cluster->cpg_handle = handle;
 495     mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
 496 
 497   bail:
 498     if (rc != CS_OK) {
 499         cpg_finalize(handle);
 500         return FALSE;
 501     }
 502 
 503     peer = crm_get_peer(id, NULL);
 504     crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
 505     return TRUE;
 506 }
 507 
 508 gboolean
 509 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
     /* [previous][next][first][last][top][bottom][index][help] */
 510 {
 511     gboolean rc = TRUE;
 512     char *data = NULL;
 513 
 514     data = dump_xml_unformatted(msg);
 515     rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
 516     free(data);
 517     return rc;
 518 }
 519 
 520 gboolean
 521 send_cluster_text(int class, const char *data,
     /* [previous][next][first][last][top][bottom][index][help] */
 522               gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
 523 {
 524     static int msg_id = 0;
 525     static int local_pid = 0;
 526     static int local_name_len = 0;
 527     static const char *local_name = NULL;
 528 
 529     char *target = NULL;
 530     struct iovec *iov;
 531     AIS_Message *msg = NULL;
 532     enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
 533 
 534     /* There are only 6 handlers registered to crm_lib_service in plugin.c */
 535     CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
 536               return FALSE);
 537 
 538 #if !SUPPORT_PLUGIN
 539     CRM_CHECK(dest != crm_msg_ais, return FALSE);
 540 #endif
 541 
 542     if(local_name == NULL) {
 543         local_name = get_local_node_name();
 544     }
 545     if(local_name_len == 0 && local_name) {
 546         local_name_len = strlen(local_name);
 547     }
 548 
 549     if (data == NULL) {
 550         data = "";
 551     }
 552 
 553     if (local_pid == 0) {
 554         local_pid = getpid();
 555     }
 556 
 557     if (sender == crm_msg_none) {
 558         sender = local_pid;
 559     }
 560 
 561     msg = calloc(1, sizeof(AIS_Message));
 562 
 563     msg_id++;
 564     msg->id = msg_id;
 565     msg->header.id = class;
 566     msg->header.error = CS_OK;
 567 
 568     msg->host.type = dest;
 569     msg->host.local = local;
 570 
 571     if (node) {
 572         if (node->uname) {
 573             target = strdup(node->uname);
 574             msg->host.size = strlen(node->uname);
 575             memset(msg->host.uname, 0, MAX_NAME);
 576             memcpy(msg->host.uname, node->uname, msg->host.size);
 577         } else {
 578             target = crm_strdup_printf("%u", node->id);
 579         }
 580         msg->host.id = node->id;
 581     } else {
 582         target = strdup("all");
 583     }
 584 
 585     msg->sender.id = 0;
 586     msg->sender.type = sender;
 587     msg->sender.pid = local_pid;
 588     msg->sender.size = local_name_len;
 589     memset(msg->sender.uname, 0, MAX_NAME);
 590     if(local_name && msg->sender.size) {
 591         memcpy(msg->sender.uname, local_name, msg->sender.size);
 592     }
 593 
 594     msg->size = 1 + strlen(data);
 595     msg->header.size = sizeof(AIS_Message) + msg->size;
 596 
 597     if (msg->size < CRM_BZ2_THRESHOLD) {
 598         msg = realloc_safe(msg, msg->header.size);
 599         memcpy(msg->data, data, msg->size);
 600 
 601     } else {
 602         char *compressed = NULL;
 603         unsigned int new_size = 0;
 604         char *uncompressed = strdup(data);
 605 
 606         if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
 607 
 608             msg->header.size = sizeof(AIS_Message) + new_size;
 609             msg = realloc_safe(msg, msg->header.size);
 610             memcpy(msg->data, compressed, new_size);
 611 
 612             msg->is_compressed = TRUE;
 613             msg->compressed_size = new_size;
 614 
 615         } else {
 616             msg = realloc_safe(msg, msg->header.size);
 617             memcpy(msg->data, data, msg->size);
 618         }
 619 
 620         free(uncompressed);
 621         free(compressed);
 622     }
 623 
 624     iov = calloc(1, sizeof(struct iovec));
 625     iov->iov_base = msg;
 626     iov->iov_len = msg->header.size;
 627 
 628     if (msg->compressed_size) {
 629         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
 630                   msg->id, target, (unsigned long long) iov->iov_len,
 631                   msg->compressed_size, data);
 632     } else {
 633         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
 634                   msg->id, target, (unsigned long long) iov->iov_len,
 635                   msg->size, data);
 636     }
 637     free(target);
 638 
 639 #if SUPPORT_PLUGIN
 640     /* The plugin is the only time we don't use CPG messaging */
 641     if(get_cluster_type() == pcmk_cluster_classic_ais) {
 642         return send_plugin_text(class, iov);
 643     }
 644 #endif
 645 
 646     send_cpg_iov(iov);
 647 
 648     return TRUE;
 649 }
 650 
 651 enum crm_ais_msg_types
 652 text2msg_type(const char *text)
     /* [previous][next][first][last][top][bottom][index][help] */
 653 {
 654     int type = crm_msg_none;
 655 
 656     CRM_CHECK(text != NULL, return type);
 657     if (safe_str_eq(text, "ais")) {
 658         type = crm_msg_ais;
 659     } else if (safe_str_eq(text, "crm_plugin")) {
 660         type = crm_msg_ais;
 661     } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
 662         type = crm_msg_cib;
 663     } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
 664         type = crm_msg_crmd;
 665     } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
 666         type = crm_msg_crmd;
 667     } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
 668         type = crm_msg_te;
 669     } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
 670         type = crm_msg_pe;
 671     } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
 672         type = crm_msg_lrmd;
 673     } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
 674         type = crm_msg_stonithd;
 675     } else if (safe_str_eq(text, "stonith-ng")) {
 676         type = crm_msg_stonith_ng;
 677     } else if (safe_str_eq(text, "attrd")) {
 678         type = crm_msg_attrd;
 679 
 680     } else {
 681         /* This will normally be a transient client rather than
 682          * a cluster daemon.  Set the type to the pid of the client
 683          */
 684         int scan_rc = sscanf(text, "%d", &type);
 685 
 686         if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
 687             /* Ensure it's sane */
 688             type = crm_msg_none;
 689         }
 690     }
 691     return type;
 692 }

/* [previous][next][first][last][top][bottom][index][help] */