root/lib/common/ipc_server.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. pcmk__ipc_client_count
  2. pcmk__foreach_ipc_client
  3. pcmk__find_client
  4. pcmk__find_client_by_id
  5. pcmk__client_name
  6. pcmk__client_cleanup
  7. pcmk__drop_all_clients
  8. client_from_connection
  9. pcmk__new_unauth_client
  10. pcmk__new_client
  11. pcmk__new_ipc_event
  12. pcmk_free_ipc_event
  13. free_event
  14. add_event
  15. pcmk__free_client
  16. pcmk__set_client_queue_max
  17. pcmk__client_pid
  18. pcmk__client_data2xml
  19. crm_ipcs_flush_events_cb
  20. delay_next_flush
  21. crm_ipcs_flush_events
  22. pcmk__ipc_prepare_iov
  23. id_for_server_event
  24. pcmk__ipc_send_iov
  25. pcmk__ipc_send_xml
  26. pcmk__ipc_create_ack_as
  27. pcmk__ipc_send_ack_as
  28. pcmk__serve_based_ipc
  29. pcmk__stop_based_ipc
  30. pcmk__serve_controld_ipc
  31. pcmk__serve_attrd_ipc
  32. pcmk__serve_fenced_ipc
  33. pcmk__serve_pacemakerd_ipc
  34. pcmk__serve_schedulerd_ipc

   1 /*
   2  * Copyright 2004-2025 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <stdio.h>
  13 #include <errno.h>
  14 #include <bzlib.h>
  15 #include <sys/stat.h>
  16 #include <sys/types.h>
  17 
  18 #include <crm/crm.h>
  19 #include <crm/common/xml.h>
  20 #include <crm/common/ipc.h>
  21 #include <crm/common/ipc_internal.h>
  22 #include "crmcommon_private.h"
  23 
  24 /* Evict clients whose event queue grows this large (by default) */
  25 #define PCMK_IPC_DEFAULT_QUEUE_MAX 500
  26 
  27 static GHashTable *client_connections = NULL;
  28 
  29 /*!
  30  * \internal
  31  * \brief Count IPC clients
  32  *
  33  * \return Number of active IPC client connections
  34  */
  35 guint
  36 pcmk__ipc_client_count(void)
     /* [previous][next][first][last][top][bottom][index][help] */
  37 {
  38     return client_connections? g_hash_table_size(client_connections) : 0;
  39 }
  40 
  41 /*!
  42  * \internal
  43  * \brief Execute a function for each active IPC client connection
  44  *
  45  * \param[in]     func       Function to call
  46  * \param[in,out] user_data  Pointer to pass to function
  47  *
  48  * \note The parameters are the same as for g_hash_table_foreach().
  49  */
  50 void
  51 pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
  52 {
  53     if ((func != NULL) && (client_connections != NULL)) {
  54         g_hash_table_foreach(client_connections, func, user_data);
  55     }
  56 }
  57 
  58 pcmk__client_t *
  59 pcmk__find_client(const qb_ipcs_connection_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
  60 {
  61     if (client_connections) {
  62         return g_hash_table_lookup(client_connections, c);
  63     }
  64 
  65     crm_trace("No client found for %p", c);
  66     return NULL;
  67 }
  68 
  69 pcmk__client_t *
  70 pcmk__find_client_by_id(const char *id)
     /* [previous][next][first][last][top][bottom][index][help] */
  71 {
  72     if ((client_connections != NULL) && (id != NULL)) {
  73         gpointer key;
  74         pcmk__client_t *client = NULL;
  75         GHashTableIter iter;
  76 
  77         g_hash_table_iter_init(&iter, client_connections);
  78         while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
  79             if (strcmp(client->id, id) == 0) {
  80                 return client;
  81             }
  82         }
  83     }
  84     crm_trace("No client found with id='%s'", pcmk__s(id, ""));
  85     return NULL;
  86 }
  87 
  88 /*!
  89  * \internal
  90  * \brief Get a client identifier for use in log messages
  91  *
  92  * \param[in] c  Client
  93  *
  94  * \return Client's name, client's ID, or a string literal, as available
  95  * \note This is intended to be used in format strings like "client %s".
  96  */
  97 const char *
  98 pcmk__client_name(const pcmk__client_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
  99 {
 100     if (c == NULL) {
 101         return "(unspecified)";
 102 
 103     } else if (c->name != NULL) {
 104         return c->name;
 105 
 106     } else if (c->id != NULL) {
 107         return c->id;
 108 
 109     } else {
 110         return "(unidentified)";
 111     }
 112 }
 113 
 114 void
 115 pcmk__client_cleanup(void)
     /* [previous][next][first][last][top][bottom][index][help] */
 116 {
 117     if (client_connections != NULL) {
 118         int active = g_hash_table_size(client_connections);
 119 
 120         if (active > 0) {
 121             crm_warn("Exiting with %d active IPC client%s",
 122                      active, pcmk__plural_s(active));
 123         }
 124         g_hash_table_destroy(client_connections);
 125         client_connections = NULL;
 126     }
 127 }
 128 
 129 void
 130 pcmk__drop_all_clients(qb_ipcs_service_t *service)
     /* [previous][next][first][last][top][bottom][index][help] */
 131 {
 132     qb_ipcs_connection_t *c = NULL;
 133 
 134     if (service == NULL) {
 135         return;
 136     }
 137 
 138     c = qb_ipcs_connection_first_get(service);
 139 
 140     while (c != NULL) {
 141         qb_ipcs_connection_t *last = c;
 142 
 143         c = qb_ipcs_connection_next_get(service, last);
 144 
 145         /* There really shouldn't be anyone connected at this point */
 146         crm_notice("Disconnecting client %p, pid=%d...",
 147                    last, pcmk__client_pid(last));
 148         qb_ipcs_disconnect(last);
 149         qb_ipcs_connection_unref(last);
 150     }
 151 }
 152 
 153 /*!
 154  * \internal
 155  * \brief Allocate a new pcmk__client_t object based on an IPC connection
 156  *
 157  * \param[in] c           IPC connection (NULL to allocate generic client)
 158  * \param[in] key         Connection table key (NULL to use sane default)
 159  * \param[in] uid_client  UID corresponding to c (ignored if c is NULL)
 160  *
 161  * \return Pointer to new pcmk__client_t (guaranteed not to be \c NULL)
 162  */
 163 static pcmk__client_t *
 164 client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
     /* [previous][next][first][last][top][bottom][index][help] */
 165 {
 166     pcmk__client_t *client = pcmk__assert_alloc(1, sizeof(pcmk__client_t));
 167 
 168     if (c) {
 169         client->user = pcmk__uid2username(uid_client);
 170         if (client->user == NULL) {
 171             client->user = pcmk__str_copy("#unprivileged");
 172             crm_err("Unable to enforce ACLs for user ID %d, assuming unprivileged",
 173                     uid_client);
 174         }
 175         client->ipcs = c;
 176         pcmk__set_client_flags(client, pcmk__client_ipc);
 177         client->pid = pcmk__client_pid(c);
 178         if (key == NULL) {
 179             key = c;
 180         }
 181     }
 182 
 183     client->id = crm_generate_uuid();
 184     if (key == NULL) {
 185         key = client->id;
 186     }
 187     if (client_connections == NULL) {
 188         crm_trace("Creating IPC client table");
 189         client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
 190     }
 191     g_hash_table_insert(client_connections, key, client);
 192     return client;
 193 }
 194 
 195 /*!
 196  * \brief Allocate a new pcmk__client_t object and generate its ID
 197  *
 198  * \param[in] key  What to use as connections hash table key (NULL to use ID)
 199  *
 200  * \return Pointer to new pcmk__client_t (asserts on failure)
 201  */
 202 pcmk__client_t *
 203 pcmk__new_unauth_client(void *key)
     /* [previous][next][first][last][top][bottom][index][help] */
 204 {
 205     return client_from_connection(NULL, key, 0);
 206 }
 207 
 208 pcmk__client_t *
 209 pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
     /* [previous][next][first][last][top][bottom][index][help] */
 210 {
 211     gid_t uid_cluster = 0;
 212     gid_t gid_cluster = 0;
 213 
 214     pcmk__client_t *client = NULL;
 215 
 216     CRM_CHECK(c != NULL, return NULL);
 217 
 218     if (pcmk_daemon_user(&uid_cluster, &gid_cluster) < 0) {
 219         static bool need_log = TRUE;
 220 
 221         if (need_log) {
 222             crm_warn("Could not find user and group IDs for user %s",
 223                      CRM_DAEMON_USER);
 224             need_log = FALSE;
 225         }
 226     }
 227 
 228     if (uid_client != 0) {
 229         crm_trace("Giving group %u access to new IPC connection", gid_cluster);
 230         /* Passing -1 to chown(2) means don't change */
 231         qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
 232     }
 233 
 234     /* TODO: Do our own auth checking, return NULL if unauthorized */
 235     client = client_from_connection(c, NULL, uid_client);
 236 
 237     if ((uid_client == 0) || (uid_client == uid_cluster)) {
 238         /* Remember when a connection came from root or hacluster */
 239         pcmk__set_client_flags(client, pcmk__client_privileged);
 240     }
 241 
 242     crm_debug("New IPC client %s for PID %u with uid %d and gid %d",
 243               client->id, client->pid, uid_client, gid_client);
 244     return client;
 245 }
 246 
 247 static struct iovec *
 248 pcmk__new_ipc_event(void)
     /* [previous][next][first][last][top][bottom][index][help] */
 249 {
 250     return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec));
 251 }
 252 
 253 /*!
 254  * \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
 255  *
 256  * \param[in,out] event  I/O vector to free
 257  */
 258 void
 259 pcmk_free_ipc_event(struct iovec *event)
     /* [previous][next][first][last][top][bottom][index][help] */
 260 {
 261     if (event != NULL) {
 262         free(event[0].iov_base);
 263         free(event[1].iov_base);
 264         free(event);
 265     }
 266 }
 267 
 268 static void
 269 free_event(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 270 {
 271     pcmk_free_ipc_event((struct iovec *) data);
 272 }
 273 
 274 static void
 275 add_event(pcmk__client_t *c, struct iovec *iov)
     /* [previous][next][first][last][top][bottom][index][help] */
 276 {
 277     if (c->event_queue == NULL) {
 278         c->event_queue = g_queue_new();
 279     }
 280     g_queue_push_tail(c->event_queue, iov);
 281 }
 282 
 283 void
 284 pcmk__free_client(pcmk__client_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
 285 {
 286     if (c == NULL) {
 287         return;
 288     }
 289 
 290     if (client_connections) {
 291         if (c->ipcs) {
 292             crm_trace("Destroying %p/%p (%d remaining)",
 293                       c, c->ipcs, g_hash_table_size(client_connections) - 1);
 294             g_hash_table_remove(client_connections, c->ipcs);
 295 
 296         } else {
 297             crm_trace("Destroying remote connection %p (%d remaining)",
 298                       c, g_hash_table_size(client_connections) - 1);
 299             g_hash_table_remove(client_connections, c->id);
 300         }
 301     }
 302 
 303     if (c->event_timer) {
 304         g_source_remove(c->event_timer);
 305     }
 306 
 307     if (c->event_queue) {
 308         crm_debug("Destroying %d events", g_queue_get_length(c->event_queue));
 309         g_queue_free_full(c->event_queue, free_event);
 310     }
 311 
 312     free(c->id);
 313     free(c->name);
 314     free(c->user);
 315 
 316     if (c->buffer != NULL) {
 317         g_byte_array_free(c->buffer, TRUE);
 318         c->buffer = NULL;
 319     }
 320 
 321     if (c->remote) {
 322         if (c->remote->auth_timeout) {
 323             g_source_remove(c->remote->auth_timeout);
 324         }
 325         if (c->remote->tls_session != NULL) {
 326             /* @TODO Reduce duplication at callers. Put here everything
 327              * necessary to tear down and free tls_session.
 328              */
 329             gnutls_deinit(c->remote->tls_session);
 330         }
 331         free(c->remote->buffer);
 332         free(c->remote);
 333     }
 334     free(c);
 335 }
 336 
 337 /*!
 338  * \internal
 339  * \brief Raise IPC eviction threshold for a client, if allowed
 340  *
 341  * \param[in,out] client     Client to modify
 342  * \param[in]     qmax       New threshold
 343  */
 344 void
 345 pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
     /* [previous][next][first][last][top][bottom][index][help] */
 346 {
 347     int rc = pcmk_rc_ok;
 348     long long qmax_ll = 0LL;
 349     unsigned int orig_value = 0U;
 350 
 351     CRM_CHECK(client != NULL, return);
 352 
 353     orig_value = client->queue_max;
 354 
 355     if (pcmk_is_set(client->flags, pcmk__client_privileged)) {
 356         rc = pcmk__scan_ll(qmax, &qmax_ll, 0LL);
 357         if (rc == pcmk_rc_ok) {
 358             if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) {
 359                 rc = ERANGE;
 360             } else {
 361                 client->queue_max = (unsigned int) qmax_ll;
 362             }
 363         }
 364     } else {
 365         rc = EACCES;
 366     }
 367 
 368     if (rc != pcmk_rc_ok) {
 369         crm_info("Could not set IPC threshold for client %s[%u] to %s: %s",
 370                   pcmk__client_name(client), client->pid,
 371                   pcmk__s(qmax, "default"), pcmk_rc_str(rc));
 372 
 373     } else if (client->queue_max != orig_value) {
 374         crm_debug("IPC threshold for client %s[%u] is now %u (was %u)",
 375                   pcmk__client_name(client), client->pid,
 376                   client->queue_max, orig_value);
 377     }
 378 }
 379 
 380 int
 381 pcmk__client_pid(qb_ipcs_connection_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
 382 {
 383     struct qb_ipcs_connection_stats stats;
 384 
 385     stats.client_pid = 0;
 386     qb_ipcs_connection_stats_get(c, &stats, 0);
 387     return stats.client_pid;
 388 }
 389 
 390 /*!
 391  * \internal
 392  * \brief Retrieve message XML from data read from client IPC
 393  *
 394  * \param[in,out]  c       IPC client connection
 395  * \param[out]     id      Where to store message ID from libqb header
 396  * \param[out]     flags   Where to store flags from libqb header
 397  *
 398  * \return Message XML on success, NULL otherwise
 399  */
 400 xmlNode *
 401 pcmk__client_data2xml(pcmk__client_t *c, uint32_t *id, uint32_t *flags)
     /* [previous][next][first][last][top][bottom][index][help] */
 402 {
 403     xmlNode *xml = NULL;
 404     pcmk__ipc_header_t *header = (void *) c->buffer->data;
 405     char *text = (char *) header + sizeof(pcmk__ipc_header_t);
 406 
 407     if (!pcmk__valid_ipc_header(header)) {
 408         return NULL;
 409     }
 410 
 411     if (id) {
 412         *id = header->qb.id;
 413     }
 414 
 415     if (flags) {
 416         *flags = header->flags;
 417     }
 418 
 419     if (pcmk_is_set(header->flags, crm_ipc_proxied)) {
 420         /* Mark this client as being the endpoint of a proxy connection.
 421          * Proxy connections responses are sent on the event channel, to avoid
 422          * blocking the controller serving as proxy.
 423          */
 424         pcmk__set_client_flags(c, pcmk__client_proxied);
 425     }
 426 
 427     pcmk__assert(text[header->size - 1] == 0);
 428 
 429     xml = pcmk__xml_parse(text);
 430     crm_log_xml_trace(xml, "[IPC received]");
 431     return xml;
 432 }
 433 
 434 static int crm_ipcs_flush_events(pcmk__client_t *c);
 435 
 436 static gboolean
 437 crm_ipcs_flush_events_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 438 {
 439     pcmk__client_t *c = data;
 440 
 441     c->event_timer = 0;
 442     crm_ipcs_flush_events(c);
 443     return FALSE;
 444 }
 445 
 446 /*!
 447  * \internal
 448  * \brief Add progressive delay before next event queue flush
 449  *
 450  * \param[in,out] c          Client connection to add delay to
 451  * \param[in]     queue_len  Current event queue length
 452  */
 453 static inline void
 454 delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
     /* [previous][next][first][last][top][bottom][index][help] */
 455 {
 456     /* Delay a maximum of 1.5 seconds */
 457     guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
 458 
 459     c->event_timer = pcmk__create_timer(delay, crm_ipcs_flush_events_cb, c);
 460 }
 461 
 462 /*!
 463  * \internal
 464  * \brief Send client any messages in its queue
 465  *
 466  * \param[in,out] c  Client to flush
 467  *
 468  * \return Standard Pacemaker return value
 469  */
 470 static int
 471 crm_ipcs_flush_events(pcmk__client_t *c)
     /* [previous][next][first][last][top][bottom][index][help] */
 472 {
 473     int rc = pcmk_rc_ok;
 474     ssize_t qb_rc = 0;
 475     unsigned int sent = 0;
 476     unsigned int queue_len = 0;
 477 
 478     if (c == NULL) {
 479         return rc;
 480 
 481     } else if (c->event_timer) {
 482         /* There is already a timer, wait until it goes off */
 483         crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer);
 484         return rc;
 485     }
 486 
 487     if (c->event_queue) {
 488         queue_len = g_queue_get_length(c->event_queue);
 489     }
 490     while (sent < 100) {
 491         pcmk__ipc_header_t *header = NULL;
 492         struct iovec *event = NULL;
 493 
 494         if (c->event_queue) {
 495             // We don't pop unless send is successful
 496             event = g_queue_peek_head(c->event_queue);
 497         }
 498         if (event == NULL) { // Queue is empty
 499             break;
 500         }
 501 
 502         /* Retry sending the event up to five times.  If we get -EAGAIN, sleep
 503          * a very short amount of time (too long here is bad) and try again.
 504          * If we simply exit the while loop on -EAGAIN, we'll have to wait until
 505          * the timer fires off again (up to 1.5 seconds - see delay_next_flush)
 506          * to retry sending the message.
 507          *
 508          * In that case, the queue may just continue to grow faster than we are
 509          * processing it, eventually leading to daemons timing out waiting for
 510          * replies, which will cause wider failures.
 511          */
 512         for (unsigned int retries = 5; retries > 0; retries--) {
 513             qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
 514 
 515             if (qb_rc < 0) {
 516                 if (retries == 1 || qb_rc != -EAGAIN) {
 517                     rc = (int) -qb_rc;
 518                     goto no_more_retries;
 519                 } else {
 520                     pcmk__sleep_ms(5);
 521                 }
 522             } else {
 523                 break;
 524             }
 525         }
 526 
 527         event = g_queue_pop_head(c->event_queue);
 528 
 529         sent++;
 530         header = event[0].iov_base;
 531         crm_trace("Event %" PRId32 " to %p[%u] (%zd bytes) sent: %.120s",
 532                   header->qb.id, c->ipcs, c->pid, qb_rc,
 533                   (char *) (event[1].iov_base));
 534         pcmk_free_ipc_event(event);
 535     }
 536 
 537 no_more_retries:
 538     queue_len -= sent;
 539     if (sent > 0 || queue_len) {
 540         crm_trace("Sent %u events (%u remaining) for %p[%d]: %s (%zd)",
 541                   sent, queue_len, c->ipcs, c->pid, pcmk_rc_str(rc), qb_rc);
 542     }
 543 
 544     if (queue_len) {
 545 
 546         /* Allow clients to briefly fall behind on processing incoming messages,
 547          * but drop completely unresponsive clients so the connection doesn't
 548          * consume resources indefinitely.
 549          */
 550         if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
 551             if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
 552                 /* Don't evict for a new or shrinking backlog */
 553                 crm_warn("Client with process ID %u has a backlog of %u messages "
 554                          QB_XS " %p", c->pid, queue_len, c->ipcs);
 555             } else {
 556                 crm_err("Evicting client with process ID %u due to backlog of %u messages "
 557                          QB_XS " %p", c->pid, queue_len, c->ipcs);
 558                 c->queue_backlog = 0;
 559                 qb_ipcs_disconnect(c->ipcs);
 560                 return rc;
 561             }
 562         }
 563 
 564         c->queue_backlog = queue_len;
 565         delay_next_flush(c, queue_len);
 566 
 567     } else {
 568         /* Event queue is empty, there is no backlog */
 569         c->queue_backlog = 0;
 570     }
 571 
 572     return rc;
 573 }
 574 
 575 /*!
 576  * \internal
 577  * \brief Create an I/O vector for sending an IPC XML message
 578  *
 579  * If the message is too large to fit into a single buffer, this function will
 580  * prepare an I/O vector that only holds as much as fits.  The remainder can be
 581  * prepared in a separate call by keeping a running count of the number of times
 582  * this function has been called and passing that in for \p index.
 583  *
 584  * \param[in]  request Identifier for libqb response header
 585  * \param[in]  message Message to send
 586  * \param[in]  index   How many times this function has been called - basically,
 587  *                     a count of how many chunks of \p message have already
 588  *                     been sent
 589  * \param[out] result  Where to store prepared I/O vector - NULL on error
 590  * \param[out] bytes   Size of prepared data in bytes (includes header)
 591  *
 592  * \return Standard Pacemaker return code
 593  */
 594 int
 595 pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index,
     /* [previous][next][first][last][top][bottom][index][help] */
 596                       struct iovec **result, ssize_t *bytes)
 597 {
 598     struct iovec *iov = NULL;
 599     unsigned int payload_size = 0;
 600     unsigned int total = 0;
 601     unsigned int max_send_size = crm_ipc_default_buffer_size();
 602     unsigned int max_chunk_size = 0;
 603     size_t offset = 0;
 604     pcmk__ipc_header_t *header = NULL;
 605     int rc = pcmk_rc_ok;
 606 
 607     if ((message == NULL) || (result == NULL)) {
 608         rc = EINVAL;
 609         goto done;
 610     }
 611 
 612     header = calloc(1, sizeof(pcmk__ipc_header_t));
 613     if (header == NULL) {
 614        rc = ENOMEM;
 615        goto done;
 616     }
 617 
 618     *result = NULL;
 619     iov = pcmk__new_ipc_event();
 620     iov[0].iov_len = sizeof(pcmk__ipc_header_t);
 621     iov[0].iov_base = header;
 622 
 623     header->version = PCMK__IPC_VERSION;
 624 
 625     /* We are passed an index, which is basically how many times this function
 626      * has been called.  This is how we support multi-part IPC messages.  We
 627      * need to convert that into an offset into the buffer that we want to start
 628      * reading from.
 629      *
 630      * Each call to this function can send max_send_size, but this also includes
 631      * the header and a null terminator character for the end of the payload.
 632      * We need to subtract those out here.
 633      */
 634     max_chunk_size = max_send_size - iov[0].iov_len - 1;
 635     offset = index * max_chunk_size;
 636 
 637     /* How much of message is left to send?  This does not include the null
 638      * terminator character.
 639      */
 640     payload_size = message->len - offset;
 641 
 642     /* How much would be transmitted, including the header size and null
 643      * terminator character for the buffer?
 644      */
 645     total = iov[0].iov_len + payload_size + 1;
 646 
 647     if (total >= max_send_size) {
 648         /* The entire packet is too big to fit in a single buffer.  Calculate
 649          * how much of it we can send - buffer size, minus header size, minus
 650          * one for the null terminator.
 651          */
 652         payload_size = max_chunk_size;
 653 
 654         header->size = payload_size + 1;
 655 
 656         iov[1].iov_base = strndup(message->str + offset, payload_size);
 657         if (iov[1].iov_base == NULL) {
 658             rc = ENOMEM;
 659             goto done;
 660         }
 661 
 662         iov[1].iov_len = header->size;
 663         rc = pcmk_rc_ipc_more;
 664 
 665     } else {
 666         /* The entire packet fits in a single buffer.  We can copy the entirety
 667          * of it into the payload.
 668          */
 669         header->size = payload_size + 1;
 670 
 671         iov[1].iov_base = pcmk__str_copy(message->str + offset);
 672         iov[1].iov_len = header->size;
 673     }
 674 
 675     header->part_id = index;
 676     header->qb.size = iov[0].iov_len + iov[1].iov_len;
 677     header->qb.id = (int32_t)request;    /* Replying to a specific request */
 678 
 679     if ((rc == pcmk_rc_ok) && (index != 0)) {
 680         pcmk__set_ipc_flags(header->flags, "multipart ipc",
 681                             crm_ipc_multipart | crm_ipc_multipart_end);
 682     } else if (rc == pcmk_rc_ipc_more) {
 683         pcmk__set_ipc_flags(header->flags, "multipart ipc",
 684                             crm_ipc_multipart);
 685     }
 686 
 687     *result = iov;
 688     pcmk__assert(header->qb.size > 0);
 689     if (bytes != NULL) {
 690         *bytes = header->qb.size;
 691     }
 692 
 693 done:
 694     if ((rc != pcmk_rc_ok) && (rc != pcmk_rc_ipc_more)) {
 695         pcmk_free_ipc_event(iov);
 696     }
 697 
 698     return rc;
 699 }
 700 
 701 /* Return the next available ID for a server event.
 702  *
 703  * For the parts of a multipart event, all parts should have the same ID as
 704  * the first part.
 705  */
 706 static uint32_t
 707 id_for_server_event(pcmk__ipc_header_t *header)
     /* [previous][next][first][last][top][bottom][index][help] */
 708 {
 709     static uint32_t id = 1;
 710 
 711     if (pcmk_is_set(header->flags, crm_ipc_multipart) && (header->part_id != 0)) {
 712         return id;
 713     } else {
 714         id++;
 715         return id;
 716     }
 717 }
 718 
 719 int
 720 pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
     /* [previous][next][first][last][top][bottom][index][help] */
 721 {
 722     int rc = pcmk_rc_ok;
 723     pcmk__ipc_header_t *header = iov[0].iov_base;
 724 
 725     /* _ALL_ replies to proxied connections need to be sent as events */
 726     if (pcmk_is_set(c->flags, pcmk__client_proxied)
 727         && !pcmk_is_set(flags, crm_ipc_server_event)) {
 728         /* The proxied flag lets us know this was originally meant to be a
 729          * response, even though we're sending it over the event channel.
 730          */
 731         pcmk__set_ipc_flags(flags, "server event",
 732                             crm_ipc_server_event|crm_ipc_proxied_relay_response);
 733     }
 734 
 735     pcmk__set_ipc_flags(header->flags, "server event", flags);
 736     if (pcmk_is_set(flags, crm_ipc_server_event)) {
 737         /* Server events don't use an ID, though we do set one in
 738          * pcmk__ipc_prepare_iov if the event is in response to a particular
 739          * request.  In that case, we don't want to set a new ID here that
 740          * overwrites that one.
 741          *
 742          * @TODO: Since server event IDs aren't used anywhere, do we really
 743          * need to set this for any reason other than ease of logging?
 744          */
 745         if (header->qb.id == 0) {
 746             header->qb.id = id_for_server_event(header);
 747         }
 748 
 749         if (pcmk_is_set(flags, crm_ipc_server_free)) {
 750             crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid);
 751             add_event(c, iov);
 752 
 753         } else {
 754             struct iovec *iov_copy = pcmk__new_ipc_event();
 755 
 756             crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
 757             iov_copy[0].iov_len = iov[0].iov_len;
 758             iov_copy[0].iov_base = malloc(iov[0].iov_len);
 759             memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
 760 
 761             iov_copy[1].iov_len = iov[1].iov_len;
 762             iov_copy[1].iov_base = malloc(iov[1].iov_len);
 763             memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
 764 
 765             add_event(c, iov_copy);
 766         }
 767 
 768         rc = crm_ipcs_flush_events(c);
 769 
 770     } else {
 771         ssize_t qb_rc;
 772         char *part_text = NULL;
 773 
 774         CRM_LOG_ASSERT(header->qb.id != 0);     /* Replying to a specific request */
 775 
 776         if (pcmk_is_set(header->flags, crm_ipc_multipart_end)) {
 777             part_text = crm_strdup_printf(" (final part %d) ", header->part_id);
 778         } else if (pcmk_is_set(header->flags, crm_ipc_multipart)) {
 779             if (header->part_id == 0) {
 780                 part_text = crm_strdup_printf(" (initial part %d) ", header->part_id);
 781             } else {
 782                 part_text = crm_strdup_printf(" (part %d) ", header->part_id);
 783             }
 784         } else {
 785             part_text = crm_strdup_printf(" ");
 786         }
 787 
 788         qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
 789         if (qb_rc < header->qb.size) {
 790             if (qb_rc < 0) {
 791                 rc = (int) -qb_rc;
 792             }
 793 
 794             crm_notice("Response %" PRId32 "%sto pid %u failed: %s "
 795                        QB_XS " bytes=%" PRId32 " rc=%zd ipcs=%p",
 796                        header->qb.id, part_text, c->pid, pcmk_rc_str(rc),
 797                        header->qb.size, qb_rc, c->ipcs);
 798 
 799         } else {
 800             crm_trace("Response %" PRId32 "%ssent, %zd bytes to %p[%u]",
 801                       header->qb.id, part_text, qb_rc, c->ipcs, c->pid);
 802             crm_trace("Text = %s", (char *) iov[1].iov_base);
 803         }
 804 
 805         free(part_text);
 806 
 807         if (pcmk_is_set(flags, crm_ipc_server_free)) {
 808             pcmk_free_ipc_event(iov);
 809         }
 810 
 811         crm_ipcs_flush_events(c);
 812     }
 813 
 814     if ((rc == EPIPE) || (rc == ENOTCONN)) {
 815         crm_trace("Client %p disconnected", c->ipcs);
 816     }
 817 
 818     return rc;
 819 }
 820 
 821 int
 822 pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message,
     /* [previous][next][first][last][top][bottom][index][help] */
 823                    uint32_t flags)
 824 {
 825     struct iovec *iov = NULL;
 826     int rc = pcmk_rc_ok;
 827     GString *iov_buffer = NULL;
 828     uint16_t index = 0;
 829     bool event_or_proxied = false;
 830 
 831     if (c == NULL) {
 832         return EINVAL;
 833     }
 834 
 835     iov_buffer = g_string_sized_new(1024);
 836     pcmk__xml_string(message, 0, iov_buffer, 0);
 837 
 838     /* Testing crm_ipc_server_event is obvious.  pcmk__client_proxied is less
 839      * obvious.  According to pcmk__ipc_send_iov, replies to proxied connections
 840      * need to be sent as events.  However, do_local_notify (which calls this
 841      * function) will clear all flags so we can't go just by crm_ipc_server_event.
 842      *
 843      * Changing do_local_notify to check for a proxied connection first results
 844      * in processes on the Pacemaker Remote node (like cibadmin or crm_mon)
 845      * timing out when waiting for a reply.
 846      */
 847     event_or_proxied = pcmk_is_set(flags, crm_ipc_server_event)
 848                        || pcmk_is_set(c->flags, pcmk__client_proxied);
 849 
 850     do {
 851         rc = pcmk__ipc_prepare_iov(request, iov_buffer, index, &iov, NULL);
 852 
 853         switch (rc) {
 854             case pcmk_rc_ok:
 855                 /* No more chunks to send after this one */
 856                 pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
 857                 rc = pcmk__ipc_send_iov(c, iov, flags);
 858 
 859                 if (event_or_proxied) {
 860                     if (rc == EAGAIN) {
 861                         /* Return pcmk_rc_ok instead so callers don't have to know
 862                          * whether they passed an event or not when interpreting
 863                          * the return code.
 864                          */
 865                         rc = pcmk_rc_ok;
 866                     }
 867                 } else {
 868                     /* EAGAIN is an error for IPC messages.  We don't have a
 869                      * send queue for these, so we need to try again.  If there
 870                      * was some other error, we need to break out of this loop
 871                      * and report it.
 872                      *
 873                      * FIXME: Retry limit for EAGAIN?
 874                      */
 875                     if (rc == EAGAIN) {
 876                         break;
 877                     }
 878                 }
 879 
 880                 goto done;
 881 
 882             case pcmk_rc_ipc_more:
 883                 /* There are more chunks to send after this one */
 884                 pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
 885                 rc = pcmk__ipc_send_iov(c, iov, flags);
 886 
 887                 /* Did an error occur during transmission? */
 888                 if (event_or_proxied) {
 889                     /* EAGAIN is not an error for server events.  The event
 890                      * will be queued for transmission and we will attempt
 891                      * sending it again the next time pcmk__ipc_send_iov is
 892                      * called, or when the crm_ipcs_flush_events_cb happens.
 893                      */
 894                     if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
 895                         goto done;
 896                     }
 897 
 898                     index++;
 899                     break;
 900 
 901                 } else {
 902                     /* EAGAIN is an error for IPC messages.  We don't have a
 903                      * send queue for these, so we need to try again.  If there
 904                      * was some other error, we need to break out of this loop
 905                      * and report it.
 906                      *
 907                      * FIXME: Retry limit for EAGAIN?
 908                      */
 909                     if (rc == pcmk_rc_ok) {
 910                         index++;
 911                         break;
 912                     } else if (rc == EAGAIN) {
 913                         break;
 914                     } else {
 915                         goto done;
 916                     }
 917                 }
 918 
 919             default:
 920                 /* An error occurred during preparation */
 921                 goto done;
 922         }
 923     } while (true);
 924 
 925 done:
 926     if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
 927         crm_notice("IPC message to pid %u failed: %s " QB_XS " rc=%d",
 928                    c->pid, pcmk_rc_str(rc), rc);
 929     }
 930 
 931     g_string_free(iov_buffer, TRUE);
 932     return rc;
 933 }
 934 
 935 /*!
 936  * \internal
 937  * \brief Create an acknowledgement with a status code to send to a client
 938  *
 939  * \param[in] function  Calling function
 940  * \param[in] line      Source file line within calling function
 941  * \param[in] flags     IPC flags to use when sending
 942  * \param[in] tag       Element name to use for acknowledgement
 943  * \param[in] ver       IPC protocol version (can be NULL)
 944  * \param[in] status    Exit status code to add to ack
 945  *
 946  * \return Newly created XML for ack
 947  *
 948  * \note The caller is responsible for freeing the return value with
 949  *       \c pcmk__xml_free().
 950  */
 951 xmlNode *
 952 pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
     /* [previous][next][first][last][top][bottom][index][help] */
 953                         const char *tag, const char *ver, crm_exit_t status)
 954 {
 955     xmlNode *ack = NULL;
 956 
 957     if (pcmk_is_set(flags, crm_ipc_client_response)) {
 958         ack = pcmk__xe_create(NULL, tag);
 959         crm_xml_add(ack, PCMK_XA_FUNCTION, function);
 960         crm_xml_add_int(ack, PCMK__XA_LINE, line);
 961         crm_xml_add_int(ack, PCMK_XA_STATUS, (int) status);
 962         crm_xml_add(ack, PCMK__XA_IPC_PROTO_VERSION, ver);
 963     }
 964     return ack;
 965 }
 966 
 967 /*!
 968  * \internal
 969  * \brief Send an acknowledgement with a status code to a client
 970  *
 971  * \param[in] function  Calling function
 972  * \param[in] line      Source file line within calling function
 973  * \param[in] c         Client to send ack to
 974  * \param[in] request   Request ID being replied to
 975  * \param[in] flags     IPC flags to use when sending
 976  * \param[in] tag       Element name to use for acknowledgement
 977  * \param[in] ver       IPC protocol version (can be NULL)
 978  * \param[in] status    Status code to send with acknowledgement
 979  *
 980  * \return Standard Pacemaker return code
 981  */
 982 int
 983 pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
     /* [previous][next][first][last][top][bottom][index][help] */
 984                       uint32_t request, uint32_t flags, const char *tag,
 985                       const char *ver, crm_exit_t status)
 986 {
 987     int rc = pcmk_rc_ok;
 988     xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, tag, ver, status);
 989 
 990     if (ack != NULL) {
 991         crm_trace("Ack'ing IPC message from client %s as <%s status=%d>",
 992                   pcmk__client_name(c), tag, status);
 993         crm_log_xml_trace(ack, "sent-ack");
 994         c->request_id = 0;
 995         rc = pcmk__ipc_send_xml(c, request, ack, flags);
 996         pcmk__xml_free(ack);
 997     }
 998     return rc;
 999 }
1000 
1001 /*!
1002  * \internal
1003  * \brief Add an IPC server to the main loop for the CIB manager API
1004  *
1005  * \param[out] ipcs_ro   New IPC server for read-only CIB manager API
1006  * \param[out] ipcs_rw   New IPC server for read/write CIB manager API
1007  * \param[out] ipcs_shm  New IPC server for shared-memory CIB manager API
1008  * \param[in]  ro_cb     IPC callbacks for read-only API
1009  * \param[in]  rw_cb     IPC callbacks for read/write and shared-memory APIs
1010  *
1011  * \note This function exits fatally if unable to create the servers.
1012  * \note There is no actual difference between the three IPC endpoints other
1013  *       than their names.
1014  */
1015 void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
     /* [previous][next][first][last][top][bottom][index][help] */
1016                            qb_ipcs_service_t **ipcs_rw,
1017                            qb_ipcs_service_t **ipcs_shm,
1018                            struct qb_ipcs_service_handlers *ro_cb,
1019                            struct qb_ipcs_service_handlers *rw_cb)
1020 {
1021     *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
1022                                        QB_IPC_NATIVE, ro_cb);
1023 
1024     *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
1025                                        QB_IPC_NATIVE, rw_cb);
1026 
1027     *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
1028                                         QB_IPC_SHM, rw_cb);
1029 
1030     if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
1031         crm_err("Failed to create the CIB manager: exiting and inhibiting respawn");
1032         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled");
1033         crm_exit(CRM_EX_FATAL);
1034     }
1035 }
1036 
1037 /*!
1038  * \internal
1039  * \brief Destroy IPC servers for the CIB manager API
1040  *
1041  * \param[out] ipcs_ro   IPC server for read-only the CIB manager API
1042  * \param[out] ipcs_rw   IPC server for read/write the CIB manager API
1043  * \param[out] ipcs_shm  IPC server for shared-memory the CIB manager API
1044  *
1045  * \note This is a convenience function for calling qb_ipcs_destroy() for each
1046  *       argument.
1047  */
1048 void
1049 pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
     /* [previous][next][first][last][top][bottom][index][help] */
1050                      qb_ipcs_service_t *ipcs_rw,
1051                      qb_ipcs_service_t *ipcs_shm)
1052 {
1053     qb_ipcs_destroy(ipcs_ro);
1054     qb_ipcs_destroy(ipcs_rw);
1055     qb_ipcs_destroy(ipcs_shm);
1056 }
1057 
1058 /*!
1059  * \internal
1060  * \brief Add an IPC server to the main loop for the controller API
1061  *
1062  * \param[in] cb  IPC callbacks
1063  *
1064  * \return Newly created IPC server
1065  */
1066 qb_ipcs_service_t *
1067 pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
     /* [previous][next][first][last][top][bottom][index][help] */
1068 {
1069     return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
1070 }
1071 
1072 /*!
1073  * \internal
1074  * \brief Add an IPC server to the main loop for the attribute manager API
1075  *
1076  * \param[out] ipcs  Where to store newly created IPC server
1077  * \param[in] cb  IPC callbacks
1078  *
1079  * \note This function exits fatally if unable to create the servers.
1080  */
1081 void
1082 pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
     /* [previous][next][first][last][top][bottom][index][help] */
1083                       struct qb_ipcs_service_handlers *cb)
1084 {
1085     *ipcs = mainloop_add_ipc_server(PCMK__VALUE_ATTRD, QB_IPC_NATIVE, cb);
1086 
1087     if (*ipcs == NULL) {
1088         crm_crit("Exiting fatally because unable to serve " PCMK__SERVER_ATTRD
1089                  " IPC (verify pacemaker and pacemaker_remote are not both "
1090                  "enabled)");
1091         crm_exit(CRM_EX_FATAL);
1092     }
1093 }
1094 
1095 /*!
1096  * \internal
1097  * \brief Add an IPC server to the main loop for the fencer API
1098  *
1099  * \param[out] ipcs  Where to store newly created IPC server
1100  * \param[in]  cb    IPC callbacks
1101  *
1102  * \note This function exits fatally if unable to create the servers.
1103  */
1104 void
1105 pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
     /* [previous][next][first][last][top][bottom][index][help] */
1106                        struct qb_ipcs_service_handlers *cb)
1107 {
1108     *ipcs = mainloop_add_ipc_server_with_prio("stonith-ng", QB_IPC_NATIVE, cb,
1109                                               QB_LOOP_HIGH);
1110 
1111     if (*ipcs == NULL) {
1112         crm_err("Failed to create fencer: exiting and inhibiting respawn.");
1113         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
1114         crm_exit(CRM_EX_FATAL);
1115     }
1116 }
1117 
1118 /*!
1119  * \internal
1120  * \brief Add an IPC server to the main loop for the pacemakerd API
1121  *
1122  * \param[out] ipcs  Where to store newly created IPC server
1123  * \param[in]  cb    IPC callbacks
1124  *
1125  * \note This function exits with CRM_EX_OSERR if unable to create the servers.
1126  */
1127 void
1128 pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
     /* [previous][next][first][last][top][bottom][index][help] */
1129                        struct qb_ipcs_service_handlers *cb)
1130 {
1131     *ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, cb);
1132 
1133     if (*ipcs == NULL) {
1134         crm_err("Couldn't start pacemakerd IPC server");
1135         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
1136         /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
1137          * if we want to prevent pacemakerd from restarting them.
1138          * With pacemakerd we leave the exit-code shown to e.g. systemd
1139          * to what it was prior to moving the code here from pacemakerd.c
1140          */
1141         crm_exit(CRM_EX_OSERR);
1142     }
1143 }
1144 
1145 /*!
1146  * \internal
1147  * \brief Add an IPC server to the main loop for the scheduler API
1148  *
1149  * \param[in] cb  IPC callbacks
1150  *
1151  * \return Newly created IPC server
1152  * \note This function exits fatally if unable to create the servers.
1153  */
1154 qb_ipcs_service_t *
1155 pcmk__serve_schedulerd_ipc(struct qb_ipcs_service_handlers *cb)
     /* [previous][next][first][last][top][bottom][index][help] */
1156 {
1157     return mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_NATIVE, cb);
1158 }

/* [previous][next][first][last][top][bottom][index][help] */