root/lib/common/ipc_server.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. pcmk__ipc_client_count
  2. pcmk__foreach_ipc_client
  3. pcmk__find_client
  4. pcmk__find_client_by_id
  5. pcmk__client_name
  6. pcmk__client_cleanup
  7. pcmk__drop_all_clients
  8. client_from_connection
  9. pcmk__new_unauth_client
  10. pcmk__new_client
  11. pcmk__new_ipc_event
  12. pcmk_free_ipc_event
  13. free_event
  14. add_event
  15. pcmk__free_client
  16. pcmk__set_client_queue_max
  17. pcmk__client_pid
  18. pcmk__client_data2xml
  19. crm_ipcs_flush_events_cb
  20. delay_next_flush
  21. crm_ipcs_flush_events
  22. pcmk__ipc_prepare_iov
  23. pcmk__ipc_send_iov
  24. pcmk__ipc_send_xml
  25. pcmk__ipc_create_ack_as
  26. pcmk__ipc_send_ack_as
  27. pcmk__serve_based_ipc
  28. pcmk__stop_based_ipc
  29. pcmk__serve_controld_ipc
  30. pcmk__serve_attrd_ipc
  31. pcmk__serve_fenced_ipc
  32. pcmk__serve_pacemakerd_ipc
  33. pcmk__serve_schedulerd_ipc

   1 /*
   2  * Copyright 2004-2024 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <stdio.h>
  13 #include <errno.h>
  14 #include <bzlib.h>
  15 #include <sys/stat.h>
  16 #include <sys/types.h>
  17 
  18 #include <crm/crm.h>
  19 #include <crm/common/xml.h>
  20 #include <crm/common/ipc.h>
  21 #include <crm/common/ipc_internal.h>
  22 #include "crmcommon_private.h"
  23 
  24 /* Evict clients whose event queue grows this large (by default) */
  25 #define PCMK_IPC_DEFAULT_QUEUE_MAX 500
  26 
  27 static GHashTable *client_connections = NULL;
  28 
  29 /*!
  30  * \internal
  31  * \brief Count IPC clients
  32  *
  33  * \return Number of active IPC client connections
  34  */
  35 guint
  36 pcmk__ipc_client_count(void)
     /* [previous][next][first][last][top][bottom][index][help] */
  37 {
  38     return client_connections? g_hash_table_size(client_connections) : 0;
  39 }
  40 
  41 /*!
  42  * \internal
  43  * \brief Execute a function for each active IPC client connection
  44  *
  45  * \param[in]     func       Function to call
  46  * \param[in,out] user_data  Pointer to pass to function
  47  *
  48  * \note The parameters are the same as for g_hash_table_foreach().
  49  */
  50 void
  51 pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
  52 {
  53     if ((func != NULL) && (client_connections != NULL)) {
  54         g_hash_table_foreach(client_connections, func, user_data);
  55     }
  56 }
  57 
  58 pcmk__client_t *
  59 pcmk__find_client(const qb_ipcs_connection_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
  60 {
  61     if (client_connections) {
  62         return g_hash_table_lookup(client_connections, c);
  63     }
  64 
  65     crm_trace("No client found for %p", c);
  66     return NULL;
  67 }
  68 
  69 pcmk__client_t *
  70 pcmk__find_client_by_id(const char *id)
     /* [previous][next][first][last][top][bottom][index][help] */
  71 {
  72     if ((client_connections != NULL) && (id != NULL)) {
  73         gpointer key;
  74         pcmk__client_t *client = NULL;
  75         GHashTableIter iter;
  76 
  77         g_hash_table_iter_init(&iter, client_connections);
  78         while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
  79             if (strcmp(client->id, id) == 0) {
  80                 return client;
  81             }
  82         }
  83     }
  84     crm_trace("No client found with id='%s'", pcmk__s(id, ""));
  85     return NULL;
  86 }
  87 
  88 /*!
  89  * \internal
  90  * \brief Get a client identifier for use in log messages
  91  *
  92  * \param[in] c  Client
  93  *
  94  * \return Client's name, client's ID, or a string literal, as available
  95  * \note This is intended to be used in format strings like "client %s".
  96  */
  97 const char *
  98 pcmk__client_name(const pcmk__client_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
  99 {
 100     if (c == NULL) {
 101         return "(unspecified)";
 102 
 103     } else if (c->name != NULL) {
 104         return c->name;
 105 
 106     } else if (c->id != NULL) {
 107         return c->id;
 108 
 109     } else {
 110         return "(unidentified)";
 111     }
 112 }
 113 
 114 void
 115 pcmk__client_cleanup(void)
     /* [previous][next][first][last][top][bottom][index][help] */
 116 {
 117     if (client_connections != NULL) {
 118         int active = g_hash_table_size(client_connections);
 119 
 120         if (active > 0) {
 121             crm_warn("Exiting with %d active IPC client%s",
 122                      active, pcmk__plural_s(active));
 123         }
 124         g_hash_table_destroy(client_connections);
 125         client_connections = NULL;
 126     }
 127 }
 128 
 129 void
 130 pcmk__drop_all_clients(qb_ipcs_service_t *service)
     /* [previous][next][first][last][top][bottom][index][help] */
 131 {
 132     qb_ipcs_connection_t *c = NULL;
 133 
 134     if (service == NULL) {
 135         return;
 136     }
 137 
 138     c = qb_ipcs_connection_first_get(service);
 139 
 140     while (c != NULL) {
 141         qb_ipcs_connection_t *last = c;
 142 
 143         c = qb_ipcs_connection_next_get(service, last);
 144 
 145         /* There really shouldn't be anyone connected at this point */
 146         crm_notice("Disconnecting client %p, pid=%d...",
 147                    last, pcmk__client_pid(last));
 148         qb_ipcs_disconnect(last);
 149         qb_ipcs_connection_unref(last);
 150     }
 151 }
 152 
 153 /*!
 154  * \internal
 155  * \brief Allocate a new pcmk__client_t object based on an IPC connection
 156  *
 157  * \param[in] c           IPC connection (NULL to allocate generic client)
 158  * \param[in] key         Connection table key (NULL to use sane default)
 159  * \param[in] uid_client  UID corresponding to c (ignored if c is NULL)
 160  *
 161  * \return Pointer to new pcmk__client_t (guaranteed not to be \c NULL)
 162  */
 163 static pcmk__client_t *
 164 client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
     /* [previous][next][first][last][top][bottom][index][help] */
 165 {
 166     pcmk__client_t *client = pcmk__assert_alloc(1, sizeof(pcmk__client_t));
 167 
 168     if (c) {
 169         client->user = pcmk__uid2username(uid_client);
 170         if (client->user == NULL) {
 171             client->user = pcmk__str_copy("#unprivileged");
 172             crm_err("Unable to enforce ACLs for user ID %d, assuming unprivileged",
 173                     uid_client);
 174         }
 175         client->ipcs = c;
 176         pcmk__set_client_flags(client, pcmk__client_ipc);
 177         client->pid = pcmk__client_pid(c);
 178         if (key == NULL) {
 179             key = c;
 180         }
 181     }
 182 
 183     client->id = crm_generate_uuid();
 184     if (key == NULL) {
 185         key = client->id;
 186     }
 187     if (client_connections == NULL) {
 188         crm_trace("Creating IPC client table");
 189         client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
 190     }
 191     g_hash_table_insert(client_connections, key, client);
 192     return client;
 193 }
 194 
 195 /*!
 196  * \brief Allocate a new pcmk__client_t object and generate its ID
 197  *
 198  * \param[in] key  What to use as connections hash table key (NULL to use ID)
 199  *
 200  * \return Pointer to new pcmk__client_t (asserts on failure)
 201  */
 202 pcmk__client_t *
 203 pcmk__new_unauth_client(void *key)
     /* [previous][next][first][last][top][bottom][index][help] */
 204 {
 205     return client_from_connection(NULL, key, 0);
 206 }
 207 
 208 pcmk__client_t *
 209 pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
     /* [previous][next][first][last][top][bottom][index][help] */
 210 {
 211     gid_t uid_cluster = 0;
 212     gid_t gid_cluster = 0;
 213 
 214     pcmk__client_t *client = NULL;
 215 
 216     CRM_CHECK(c != NULL, return NULL);
 217 
 218     if (pcmk_daemon_user(&uid_cluster, &gid_cluster) < 0) {
 219         static bool need_log = TRUE;
 220 
 221         if (need_log) {
 222             crm_warn("Could not find user and group IDs for user %s",
 223                      CRM_DAEMON_USER);
 224             need_log = FALSE;
 225         }
 226     }
 227 
 228     if (uid_client != 0) {
 229         crm_trace("Giving group %u access to new IPC connection", gid_cluster);
 230         /* Passing -1 to chown(2) means don't change */
 231         qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
 232     }
 233 
 234     /* TODO: Do our own auth checking, return NULL if unauthorized */
 235     client = client_from_connection(c, NULL, uid_client);
 236 
 237     if ((uid_client == 0) || (uid_client == uid_cluster)) {
 238         /* Remember when a connection came from root or hacluster */
 239         pcmk__set_client_flags(client, pcmk__client_privileged);
 240     }
 241 
 242     crm_debug("New IPC client %s for PID %u with uid %d and gid %d",
 243               client->id, client->pid, uid_client, gid_client);
 244     return client;
 245 }
 246 
 247 static struct iovec *
 248 pcmk__new_ipc_event(void)
     /* [previous][next][first][last][top][bottom][index][help] */
 249 {
 250     return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec));
 251 }
 252 
 253 /*!
 254  * \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
 255  *
 256  * \param[in,out] event  I/O vector to free
 257  */
 258 void
 259 pcmk_free_ipc_event(struct iovec *event)
     /* [previous][next][first][last][top][bottom][index][help] */
 260 {
 261     if (event != NULL) {
 262         free(event[0].iov_base);
 263         free(event[1].iov_base);
 264         free(event);
 265     }
 266 }
 267 
 268 static void
 269 free_event(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 270 {
 271     pcmk_free_ipc_event((struct iovec *) data);
 272 }
 273 
 274 static void
 275 add_event(pcmk__client_t *c, struct iovec *iov)
     /* [previous][next][first][last][top][bottom][index][help] */
 276 {
 277     if (c->event_queue == NULL) {
 278         c->event_queue = g_queue_new();
 279     }
 280     g_queue_push_tail(c->event_queue, iov);
 281 }
 282 
 283 void
 284 pcmk__free_client(pcmk__client_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
 285 {
 286     if (c == NULL) {
 287         return;
 288     }
 289 
 290     if (client_connections) {
 291         if (c->ipcs) {
 292             crm_trace("Destroying %p/%p (%d remaining)",
 293                       c, c->ipcs, g_hash_table_size(client_connections) - 1);
 294             g_hash_table_remove(client_connections, c->ipcs);
 295 
 296         } else {
 297             crm_trace("Destroying remote connection %p (%d remaining)",
 298                       c, g_hash_table_size(client_connections) - 1);
 299             g_hash_table_remove(client_connections, c->id);
 300         }
 301     }
 302 
 303     if (c->event_timer) {
 304         g_source_remove(c->event_timer);
 305     }
 306 
 307     if (c->event_queue) {
 308         crm_debug("Destroying %d events", g_queue_get_length(c->event_queue));
 309         g_queue_free_full(c->event_queue, free_event);
 310     }
 311 
 312     free(c->id);
 313     free(c->name);
 314     free(c->user);
 315     if (c->remote) {
 316         if (c->remote->auth_timeout) {
 317             g_source_remove(c->remote->auth_timeout);
 318         }
 319         if (c->remote->tls_session != NULL) {
 320             /* @TODO Reduce duplication at callers. Put here everything
 321              * necessary to tear down and free tls_session.
 322              */
 323             gnutls_deinit(c->remote->tls_session);
 324         }
 325         free(c->remote->buffer);
 326         free(c->remote);
 327     }
 328     free(c);
 329 }
 330 
 331 /*!
 332  * \internal
 333  * \brief Raise IPC eviction threshold for a client, if allowed
 334  *
 335  * \param[in,out] client     Client to modify
 336  * \param[in]     qmax       New threshold
 337  */
 338 void
 339 pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
     /* [previous][next][first][last][top][bottom][index][help] */
 340 {
 341     int rc = pcmk_rc_ok;
 342     long long qmax_ll = 0LL;
 343     unsigned int orig_value = 0U;
 344 
 345     CRM_CHECK(client != NULL, return);
 346 
 347     orig_value = client->queue_max;
 348 
 349     if (pcmk_is_set(client->flags, pcmk__client_privileged)) {
 350         rc = pcmk__scan_ll(qmax, &qmax_ll, 0LL);
 351         if (rc == pcmk_rc_ok) {
 352             if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) {
 353                 rc = ERANGE;
 354             } else {
 355                 client->queue_max = (unsigned int) qmax_ll;
 356             }
 357         }
 358     } else {
 359         rc = EACCES;
 360     }
 361 
 362     if (rc != pcmk_rc_ok) {
 363         crm_info("Could not set IPC threshold for client %s[%u] to %s: %s",
 364                   pcmk__client_name(client), client->pid,
 365                   pcmk__s(qmax, "default"), pcmk_rc_str(rc));
 366 
 367     } else if (client->queue_max != orig_value) {
 368         crm_debug("IPC threshold for client %s[%u] is now %u (was %u)",
 369                   pcmk__client_name(client), client->pid,
 370                   client->queue_max, orig_value);
 371     }
 372 }
 373 
 374 int
 375 pcmk__client_pid(qb_ipcs_connection_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
 376 {
 377     struct qb_ipcs_connection_stats stats;
 378 
 379     stats.client_pid = 0;
 380     qb_ipcs_connection_stats_get(c, &stats, 0);
 381     return stats.client_pid;
 382 }
 383 
 384 /*!
 385  * \internal
 386  * \brief Retrieve message XML from data read from client IPC
 387  *
 388  * \param[in,out]  c       IPC client connection
 389  * \param[in]      data    Data read from client connection
 390  * \param[out]     id      Where to store message ID from libqb header
 391  * \param[out]     flags   Where to store flags from libqb header
 392  *
 393  * \return Message XML on success, NULL otherwise
 394  */
 395 xmlNode *
 396 pcmk__client_data2xml(pcmk__client_t *c, void *data, uint32_t *id,
     /* [previous][next][first][last][top][bottom][index][help] */
 397                       uint32_t *flags)
 398 {
 399     xmlNode *xml = NULL;
 400     char *uncompressed = NULL;
 401     char *text = ((char *)data) + sizeof(pcmk__ipc_header_t);
 402     pcmk__ipc_header_t *header = data;
 403 
 404     if (!pcmk__valid_ipc_header(header)) {
 405         return NULL;
 406     }
 407 
 408     if (id) {
 409         *id = ((struct qb_ipc_response_header *)data)->id;
 410     }
 411     if (flags) {
 412         *flags = header->flags;
 413     }
 414 
 415     if (pcmk_is_set(header->flags, crm_ipc_proxied)) {
 416         /* Mark this client as being the endpoint of a proxy connection.
 417          * Proxy connections responses are sent on the event channel, to avoid
 418          * blocking the controller serving as proxy.
 419          */
 420         pcmk__set_client_flags(c, pcmk__client_proxied);
 421     }
 422 
 423     if (header->size_compressed) {
 424         int rc = 0;
 425         unsigned int size_u = 1 + header->size_uncompressed;
 426         uncompressed = pcmk__assert_alloc(1, size_u);
 427 
 428         crm_trace("Decompressing message data %u bytes into %u bytes",
 429                   header->size_compressed, size_u);
 430 
 431         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0);
 432         text = uncompressed;
 433 
 434         rc = pcmk__bzlib2rc(rc);
 435 
 436         if (rc != pcmk_rc_ok) {
 437             crm_err("Decompression failed: %s " QB_XS " rc=%d",
 438                     pcmk_rc_str(rc), rc);
 439             free(uncompressed);
 440             return NULL;
 441         }
 442     }
 443 
 444     pcmk__assert(text[header->size_uncompressed - 1] == 0);
 445 
 446     xml = pcmk__xml_parse(text);
 447     crm_log_xml_trace(xml, "[IPC received]");
 448 
 449     free(uncompressed);
 450     return xml;
 451 }
 452 
 453 static int crm_ipcs_flush_events(pcmk__client_t *c);
 454 
 455 static gboolean
 456 crm_ipcs_flush_events_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 457 {
 458     pcmk__client_t *c = data;
 459 
 460     c->event_timer = 0;
 461     crm_ipcs_flush_events(c);
 462     return FALSE;
 463 }
 464 
 465 /*!
 466  * \internal
 467  * \brief Add progressive delay before next event queue flush
 468  *
 469  * \param[in,out] c          Client connection to add delay to
 470  * \param[in]     queue_len  Current event queue length
 471  */
 472 static inline void
 473 delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
     /* [previous][next][first][last][top][bottom][index][help] */
 474 {
 475     /* Delay a maximum of 1.5 seconds */
 476     guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
 477 
 478     c->event_timer = pcmk__create_timer(delay, crm_ipcs_flush_events_cb, c);
 479 }
 480 
 481 /*!
 482  * \internal
 483  * \brief Send client any messages in its queue
 484  *
 485  * \param[in,out] c  Client to flush
 486  *
 487  * \return Standard Pacemaker return value
 488  */
 489 static int
 490 crm_ipcs_flush_events(pcmk__client_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
 491 {
 492     int rc = pcmk_rc_ok;
 493     ssize_t qb_rc = 0;
 494     unsigned int sent = 0;
 495     unsigned int queue_len = 0;
 496 
 497     if (c == NULL) {
 498         return rc;
 499 
 500     } else if (c->event_timer) {
 501         /* There is already a timer, wait until it goes off */
 502         crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer);
 503         return rc;
 504     }
 505 
 506     if (c->event_queue) {
 507         queue_len = g_queue_get_length(c->event_queue);
 508     }
 509     while (sent < 100) {
 510         pcmk__ipc_header_t *header = NULL;
 511         struct iovec *event = NULL;
 512 
 513         if (c->event_queue) {
 514             // We don't pop unless send is successful
 515             event = g_queue_peek_head(c->event_queue);
 516         }
 517         if (event == NULL) { // Queue is empty
 518             break;
 519         }
 520 
 521         qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
 522         if (qb_rc < 0) {
 523             rc = (int) -qb_rc;
 524             break;
 525         }
 526         event = g_queue_pop_head(c->event_queue);
 527 
 528         sent++;
 529         header = event[0].iov_base;
 530         if (header->size_compressed) {
 531             crm_trace("Event %d to %p[%d] (%lld compressed bytes) sent",
 532                       header->qb.id, c->ipcs, c->pid, (long long) qb_rc);
 533         } else {
 534             crm_trace("Event %d to %p[%d] (%lld bytes) sent: %.120s",
 535                       header->qb.id, c->ipcs, c->pid, (long long) qb_rc,
 536                       (char *) (event[1].iov_base));
 537         }
 538         pcmk_free_ipc_event(event);
 539     }
 540 
 541     queue_len -= sent;
 542     if (sent > 0 || queue_len) {
 543         crm_trace("Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
 544                   sent, queue_len, c->ipcs, c->pid,
 545                   pcmk_rc_str(rc), (long long) qb_rc);
 546     }
 547 
 548     if (queue_len) {
 549 
 550         /* Allow clients to briefly fall behind on processing incoming messages,
 551          * but drop completely unresponsive clients so the connection doesn't
 552          * consume resources indefinitely.
 553          */
 554         if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
 555             if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
 556                 /* Don't evict for a new or shrinking backlog */
 557                 crm_warn("Client with process ID %u has a backlog of %u messages "
 558                          QB_XS " %p", c->pid, queue_len, c->ipcs);
 559             } else {
 560                 crm_err("Evicting client with process ID %u due to backlog of %u messages "
 561                          QB_XS " %p", c->pid, queue_len, c->ipcs);
 562                 c->queue_backlog = 0;
 563                 qb_ipcs_disconnect(c->ipcs);
 564                 return rc;
 565             }
 566         }
 567 
 568         c->queue_backlog = queue_len;
 569         delay_next_flush(c, queue_len);
 570 
 571     } else {
 572         /* Event queue is empty, there is no backlog */
 573         c->queue_backlog = 0;
 574     }
 575 
 576     return rc;
 577 }
 578 
 579 /*!
 580  * \internal
 581  * \brief Create an I/O vector for sending an IPC XML message
 582  *
 583  * \param[in]  request        Identifier for libqb response header
 584  * \param[in]  message        XML message to send
 585  * \param[in]  max_send_size  If 0, default IPC buffer size is used
 586  * \param[out] result         Where to store prepared I/O vector
 587  * \param[out] bytes          Size of prepared data in bytes
 588  *
 589  * \return Standard Pacemaker return code
 590  */
 591 int
 592 pcmk__ipc_prepare_iov(uint32_t request, const xmlNode *message,
     /* [previous][next][first][last][top][bottom][index][help] */
 593                       uint32_t max_send_size, struct iovec **result,
 594                       ssize_t *bytes)
 595 {
 596     struct iovec *iov;
 597     unsigned int total = 0;
 598     GString *buffer = NULL;
 599     pcmk__ipc_header_t *header = NULL;
 600     int rc = pcmk_rc_ok;
 601 
 602     if ((message == NULL) || (result == NULL)) {
 603         rc = EINVAL;
 604         goto done;
 605     }
 606 
 607     header = calloc(1, sizeof(pcmk__ipc_header_t));
 608     if (header == NULL) {
 609        rc = ENOMEM;
 610        goto done;
 611     }
 612 
 613     buffer = g_string_sized_new(1024);
 614     pcmk__xml_string(message, 0, buffer, 0);
 615 
 616     if (max_send_size == 0) {
 617         max_send_size = crm_ipc_default_buffer_size();
 618     }
 619     CRM_LOG_ASSERT(max_send_size != 0);
 620 
 621     *result = NULL;
 622     iov = pcmk__new_ipc_event();
 623     iov[0].iov_len = sizeof(pcmk__ipc_header_t);
 624     iov[0].iov_base = header;
 625 
 626     header->version = PCMK__IPC_VERSION;
 627     header->size_uncompressed = 1 + buffer->len;
 628     total = iov[0].iov_len + header->size_uncompressed;
 629 
 630     if (total < max_send_size) {
 631         iov[1].iov_base = pcmk__str_copy(buffer->str);
 632         iov[1].iov_len = header->size_uncompressed;
 633 
 634     } else {
 635         static unsigned int biggest = 0;
 636 
 637         char *compressed = NULL;
 638         unsigned int new_size = 0;
 639 
 640         if (pcmk__compress(buffer->str,
 641                            (unsigned int) header->size_uncompressed,
 642                            (unsigned int) max_send_size, &compressed,
 643                            &new_size) == pcmk_rc_ok) {
 644 
 645             pcmk__set_ipc_flags(header->flags, "send data", crm_ipc_compressed);
 646             header->size_compressed = new_size;
 647 
 648             iov[1].iov_len = header->size_compressed;
 649             iov[1].iov_base = compressed;
 650 
 651             biggest = QB_MAX(header->size_compressed, biggest);
 652 
 653         } else {
 654             crm_log_xml_trace(message, "EMSGSIZE");
 655             biggest = QB_MAX(header->size_uncompressed, biggest);
 656 
 657             crm_err("Could not compress %u-byte message into less than IPC "
 658                     "limit of %u bytes; set PCMK_ipc_buffer to higher value "
 659                     "(%u bytes suggested)",
 660                     header->size_uncompressed, max_send_size, 4 * biggest);
 661 
 662             free(compressed);
 663             pcmk_free_ipc_event(iov);
 664             rc = EMSGSIZE;
 665             goto done;
 666         }
 667     }
 668 
 669     header->qb.size = iov[0].iov_len + iov[1].iov_len;
 670     header->qb.id = (int32_t)request;    /* Replying to a specific request */
 671 
 672     *result = iov;
 673     pcmk__assert(header->qb.size > 0);
 674     if (bytes != NULL) {
 675         *bytes = header->qb.size;
 676     }
 677 
 678 done:
 679     if (buffer != NULL) {
 680         g_string_free(buffer, TRUE);
 681     }
 682     return rc;
 683 }
 684 
 685 int
 686 pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
     /* [previous][next][first][last][top][bottom][index][help] */
 687 {
 688     int rc = pcmk_rc_ok;
 689     static uint32_t id = 1;
 690     pcmk__ipc_header_t *header = iov[0].iov_base;
 691 
 692     if (c->flags & pcmk__client_proxied) {
 693         /* _ALL_ replies to proxied connections need to be sent as events */
 694         if (!pcmk_is_set(flags, crm_ipc_server_event)) {
 695             /* The proxied flag lets us know this was originally meant to be a
 696              * response, even though we're sending it over the event channel.
 697              */
 698             pcmk__set_ipc_flags(flags, "server event",
 699                                 crm_ipc_server_event
 700                                 |crm_ipc_proxied_relay_response);
 701         }
 702     }
 703 
 704     pcmk__set_ipc_flags(header->flags, "server event", flags);
 705     if (flags & crm_ipc_server_event) {
 706         header->qb.id = id++;   /* We don't really use it, but doesn't hurt to set one */
 707 
 708         if (flags & crm_ipc_server_free) {
 709             crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid);
 710             add_event(c, iov);
 711 
 712         } else {
 713             struct iovec *iov_copy = pcmk__new_ipc_event();
 714 
 715             crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
 716             iov_copy[0].iov_len = iov[0].iov_len;
 717             iov_copy[0].iov_base = malloc(iov[0].iov_len);
 718             memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
 719 
 720             iov_copy[1].iov_len = iov[1].iov_len;
 721             iov_copy[1].iov_base = malloc(iov[1].iov_len);
 722             memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
 723 
 724             add_event(c, iov_copy);
 725         }
 726 
 727     } else {
 728         ssize_t qb_rc;
 729 
 730         CRM_LOG_ASSERT(header->qb.id != 0);     /* Replying to a specific request */
 731 
 732         qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
 733         if (qb_rc < header->qb.size) {
 734             if (qb_rc < 0) {
 735                 rc = (int) -qb_rc;
 736             }
 737             crm_notice("Response %d to pid %d failed: %s "
 738                        QB_XS " bytes=%u rc=%lld ipcs=%p",
 739                        header->qb.id, c->pid, pcmk_rc_str(rc),
 740                        header->qb.size, (long long) qb_rc, c->ipcs);
 741 
 742         } else {
 743             crm_trace("Response %d sent, %lld bytes to %p[%d]",
 744                       header->qb.id, (long long) qb_rc, c->ipcs, c->pid);
 745         }
 746 
 747         if (flags & crm_ipc_server_free) {
 748             pcmk_free_ipc_event(iov);
 749         }
 750     }
 751 
 752     if (flags & crm_ipc_server_event) {
 753         rc = crm_ipcs_flush_events(c);
 754     } else {
 755         crm_ipcs_flush_events(c);
 756     }
 757 
 758     if ((rc == EPIPE) || (rc == ENOTCONN)) {
 759         crm_trace("Client %p disconnected", c->ipcs);
 760     }
 761     return rc;
 762 }
 763 
 764 int
 765 pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message,
     /* [previous][next][first][last][top][bottom][index][help] */
 766                    uint32_t flags)
 767 {
 768     struct iovec *iov = NULL;
 769     int rc = pcmk_rc_ok;
 770 
 771     if (c == NULL) {
 772         return EINVAL;
 773     }
 774     rc = pcmk__ipc_prepare_iov(request, message, crm_ipc_default_buffer_size(),
 775                                &iov, NULL);
 776     if (rc == pcmk_rc_ok) {
 777         pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
 778         rc = pcmk__ipc_send_iov(c, iov, flags);
 779     } else {
 780         pcmk_free_ipc_event(iov);
 781         crm_notice("IPC message to pid %d failed: %s " QB_XS " rc=%d",
 782                    c->pid, pcmk_rc_str(rc), rc);
 783     }
 784     return rc;
 785 }
 786 
 787 /*!
 788  * \internal
 789  * \brief Create an acknowledgement with a status code to send to a client
 790  *
 791  * \param[in] function  Calling function
 792  * \param[in] line      Source file line within calling function
 793  * \param[in] flags     IPC flags to use when sending
 794  * \param[in] tag       Element name to use for acknowledgement
 795  * \param[in] ver       IPC protocol version (can be NULL)
 796  * \param[in] status    Exit status code to add to ack
 797  *
 798  * \return Newly created XML for ack
 799  *
 800  * \note The caller is responsible for freeing the return value with
 801  *       \c pcmk__xml_free().
 802  */
 803 xmlNode *
 804 pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
     /* [previous][next][first][last][top][bottom][index][help] */
 805                         const char *tag, const char *ver, crm_exit_t status)
 806 {
 807     xmlNode *ack = NULL;
 808 
 809     if (pcmk_is_set(flags, crm_ipc_client_response)) {
 810         ack = pcmk__xe_create(NULL, tag);
 811         crm_xml_add(ack, PCMK_XA_FUNCTION, function);
 812         crm_xml_add_int(ack, PCMK__XA_LINE, line);
 813         crm_xml_add_int(ack, PCMK_XA_STATUS, (int) status);
 814         crm_xml_add(ack, PCMK__XA_IPC_PROTO_VERSION, ver);
 815     }
 816     return ack;
 817 }
 818 
 819 /*!
 820  * \internal
 821  * \brief Send an acknowledgement with a status code to a client
 822  *
 823  * \param[in] function  Calling function
 824  * \param[in] line      Source file line within calling function
 825  * \param[in] c         Client to send ack to
 826  * \param[in] request   Request ID being replied to
 827  * \param[in] flags     IPC flags to use when sending
 828  * \param[in] tag       Element name to use for acknowledgement
 829  * \param[in] ver       IPC protocol version (can be NULL)
 830  * \param[in] status    Status code to send with acknowledgement
 831  *
 832  * \return Standard Pacemaker return code
 833  */
 834 int
 835 pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
     /* [previous][next][first][last][top][bottom][index][help] */
 836                       uint32_t request, uint32_t flags, const char *tag,
 837                       const char *ver, crm_exit_t status)
 838 {
 839     int rc = pcmk_rc_ok;
 840     xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, tag, ver, status);
 841 
 842     if (ack != NULL) {
 843         crm_trace("Ack'ing IPC message from client %s as <%s status=%d>",
 844                   pcmk__client_name(c), tag, status);
 845         crm_log_xml_trace(ack, "sent-ack");
 846         c->request_id = 0;
 847         rc = pcmk__ipc_send_xml(c, request, ack, flags);
 848         pcmk__xml_free(ack);
 849     }
 850     return rc;
 851 }
 852 
 853 /*!
 854  * \internal
 855  * \brief Add an IPC server to the main loop for the CIB manager API
 856  *
 857  * \param[out] ipcs_ro   New IPC server for read-only CIB manager API
 858  * \param[out] ipcs_rw   New IPC server for read/write CIB manager API
 859  * \param[out] ipcs_shm  New IPC server for shared-memory CIB manager API
 860  * \param[in]  ro_cb     IPC callbacks for read-only API
 861  * \param[in]  rw_cb     IPC callbacks for read/write and shared-memory APIs
 862  *
 863  * \note This function exits fatally if unable to create the servers.
 864  * \note There is no actual difference between the three IPC endpoints other
 865  *       than their names.
 866  */
 867 void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
     /* [previous][next][first][last][top][bottom][index][help] */
 868                            qb_ipcs_service_t **ipcs_rw,
 869                            qb_ipcs_service_t **ipcs_shm,
 870                            struct qb_ipcs_service_handlers *ro_cb,
 871                            struct qb_ipcs_service_handlers *rw_cb)
 872 {
 873     *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
 874                                        QB_IPC_NATIVE, ro_cb);
 875 
 876     *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
 877                                        QB_IPC_NATIVE, rw_cb);
 878 
 879     *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
 880                                         QB_IPC_SHM, rw_cb);
 881 
 882     if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
 883         crm_err("Failed to create the CIB manager: exiting and inhibiting respawn");
 884         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled");
 885         crm_exit(CRM_EX_FATAL);
 886     }
 887 }
 888 
 889 /*!
 890  * \internal
 891  * \brief Destroy IPC servers for the CIB manager API
 892  *
 893  * \param[out] ipcs_ro   IPC server for read-only the CIB manager API
 894  * \param[out] ipcs_rw   IPC server for read/write the CIB manager API
 895  * \param[out] ipcs_shm  IPC server for shared-memory the CIB manager API
 896  *
 897  * \note This is a convenience function for calling qb_ipcs_destroy() for each
 898  *       argument.
 899  */
 900 void
 901 pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
     /* [previous][next][first][last][top][bottom][index][help] */
 902                      qb_ipcs_service_t *ipcs_rw,
 903                      qb_ipcs_service_t *ipcs_shm)
 904 {
 905     qb_ipcs_destroy(ipcs_ro);
 906     qb_ipcs_destroy(ipcs_rw);
 907     qb_ipcs_destroy(ipcs_shm);
 908 }
 909 
 910 /*!
 911  * \internal
 912  * \brief Add an IPC server to the main loop for the controller API
 913  *
 914  * \param[in] cb  IPC callbacks
 915  *
 916  * \return Newly created IPC server
 917  */
 918 qb_ipcs_service_t *
 919 pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
     /* [previous][next][first][last][top][bottom][index][help] */
 920 {
 921     return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
 922 }
 923 
 924 /*!
 925  * \internal
 926  * \brief Add an IPC server to the main loop for the attribute manager API
 927  *
 928  * \param[out] ipcs  Where to store newly created IPC server
 929  * \param[in] cb  IPC callbacks
 930  *
 931  * \note This function exits fatally if unable to create the servers.
 932  */
 933 void
 934 pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
     /* [previous][next][first][last][top][bottom][index][help] */
 935                       struct qb_ipcs_service_handlers *cb)
 936 {
 937     *ipcs = mainloop_add_ipc_server(PCMK__VALUE_ATTRD, QB_IPC_NATIVE, cb);
 938 
 939     if (*ipcs == NULL) {
 940         crm_crit("Exiting fatally because unable to serve " PCMK__SERVER_ATTRD
 941                  " IPC (verify pacemaker and pacemaker_remote are not both "
 942                  "enabled)");
 943         crm_exit(CRM_EX_FATAL);
 944     }
 945 }
 946 
 947 /*!
 948  * \internal
 949  * \brief Add an IPC server to the main loop for the fencer API
 950  *
 951  * \param[out] ipcs  Where to store newly created IPC server
 952  * \param[in]  cb    IPC callbacks
 953  *
 954  * \note This function exits fatally if unable to create the servers.
 955  */
 956 void
 957 pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
     /* [previous][next][first][last][top][bottom][index][help] */
 958                        struct qb_ipcs_service_handlers *cb)
 959 {
 960     *ipcs = mainloop_add_ipc_server_with_prio("stonith-ng", QB_IPC_NATIVE, cb,
 961                                               QB_LOOP_HIGH);
 962 
 963     if (*ipcs == NULL) {
 964         crm_err("Failed to create fencer: exiting and inhibiting respawn.");
 965         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
 966         crm_exit(CRM_EX_FATAL);
 967     }
 968 }
 969 
 970 /*!
 971  * \internal
 972  * \brief Add an IPC server to the main loop for the pacemakerd API
 973  *
 974  * \param[out] ipcs  Where to store newly created IPC server
 975  * \param[in]  cb    IPC callbacks
 976  *
 977  * \note This function exits with CRM_EX_OSERR if unable to create the servers.
 978  */
 979 void
 980 pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
     /* [previous][next][first][last][top][bottom][index][help] */
 981                        struct qb_ipcs_service_handlers *cb)
 982 {
 983     *ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, cb);
 984 
 985     if (*ipcs == NULL) {
 986         crm_err("Couldn't start pacemakerd IPC server");
 987         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
 988         /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
 989          * if we want to prevent pacemakerd from restarting them.
 990          * With pacemakerd we leave the exit-code shown to e.g. systemd
 991          * to what it was prior to moving the code here from pacemakerd.c
 992          */
 993         crm_exit(CRM_EX_OSERR);
 994     }
 995 }
 996 
 997 /*!
 998  * \internal
 999  * \brief Add an IPC server to the main loop for the scheduler API
1000  *
1001  * \param[in] cb  IPC callbacks
1002  *
1003  * \return Newly created IPC server
1004  * \note This function exits fatally if unable to create the servers.
1005  */
1006 qb_ipcs_service_t *
1007 pcmk__serve_schedulerd_ipc(struct qb_ipcs_service_handlers *cb)
     /* [previous][next][first][last][top][bottom][index][help] */
1008 {
1009     return mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_NATIVE, cb);
1010 }

/* [previous][next][first][last][top][bottom][index][help] */