pacemaker 3.0.1-16e74fc4da
Scalable High-Availability cluster resource manager
Loading...
Searching...
No Matches
ipc_server.c
Go to the documentation of this file.
1/*
2 * Copyright 2004-2025 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10#include <crm_internal.h>
11
12#include <stdio.h>
13#include <errno.h>
14#include <bzlib.h>
15#include <sys/stat.h>
16#include <sys/types.h>
17
18#include <crm/crm.h>
19#include <crm/common/xml.h>
20#include <crm/common/ipc.h>
22#include "crmcommon_private.h"
23
24/* Evict clients whose event queue grows this large (by default) */
25#define PCMK_IPC_DEFAULT_QUEUE_MAX 500
26
27static GHashTable *client_connections = NULL;
28
35guint
37{
38 return client_connections? g_hash_table_size(client_connections) : 0;
39}
40
50void
51pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
52{
53 if ((func != NULL) && (client_connections != NULL)) {
54 g_hash_table_foreach(client_connections, func, user_data);
55 }
56}
57
59pcmk__find_client(const qb_ipcs_connection_t *c)
60{
61 if (client_connections) {
62 return g_hash_table_lookup(client_connections, c);
63 }
64
65 crm_trace("No client found for %p", c);
66 return NULL;
67}
68
71{
72 if ((client_connections != NULL) && (id != NULL)) {
73 gpointer key;
74 pcmk__client_t *client = NULL;
75 GHashTableIter iter;
76
77 g_hash_table_iter_init(&iter, client_connections);
78 while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
79 if (strcmp(client->id, id) == 0) {
80 return client;
81 }
82 }
83 }
84 crm_trace("No client found with id='%s'", pcmk__s(id, ""));
85 return NULL;
86}
87
97const char *
99{
100 if (c == NULL) {
101 return "(unspecified)";
102
103 } else if (c->name != NULL) {
104 return c->name;
105
106 } else if (c->id != NULL) {
107 return c->id;
108
109 } else {
110 return "(unidentified)";
111 }
112}
113
114void
116{
117 if (client_connections != NULL) {
118 int active = g_hash_table_size(client_connections);
119
120 if (active > 0) {
121 crm_warn("Exiting with %d active IPC client%s",
122 active, pcmk__plural_s(active));
123 }
124 g_hash_table_destroy(client_connections);
125 client_connections = NULL;
126 }
127}
128
129void
130pcmk__drop_all_clients(qb_ipcs_service_t *service)
131{
132 qb_ipcs_connection_t *c = NULL;
133
134 if (service == NULL) {
135 return;
136 }
137
138 c = qb_ipcs_connection_first_get(service);
139
140 while (c != NULL) {
141 qb_ipcs_connection_t *last = c;
142
143 c = qb_ipcs_connection_next_get(service, last);
144
145 /* There really shouldn't be anyone connected at this point */
146 crm_notice("Disconnecting client %p, pid=%d...",
147 last, pcmk__client_pid(last));
148 qb_ipcs_disconnect(last);
149 qb_ipcs_connection_unref(last);
150 }
151}
152
163static pcmk__client_t *
164client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
165{
167
168 if (c) {
169 client->user = pcmk__uid2username(uid_client);
170 if (client->user == NULL) {
171 client->user = pcmk__str_copy("#unprivileged");
172 crm_err("Unable to enforce ACLs for user ID %d, assuming unprivileged",
173 uid_client);
174 }
175 client->ipcs = c;
177 client->pid = pcmk__client_pid(c);
178 if (key == NULL) {
179 key = c;
180 }
181 }
182
183 client->id = crm_generate_uuid();
184 if (key == NULL) {
185 key = client->id;
186 }
187 if (client_connections == NULL) {
188 crm_trace("Creating IPC client table");
189 client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
190 }
191 g_hash_table_insert(client_connections, key, client);
192 return client;
193}
194
204{
205 return client_from_connection(NULL, key, 0);
206}
207
209pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
210{
211 gid_t uid_cluster = 0;
212 gid_t gid_cluster = 0;
213
214 pcmk__client_t *client = NULL;
215
216 CRM_CHECK(c != NULL, return NULL);
217
218 if (pcmk_daemon_user(&uid_cluster, &gid_cluster) < 0) {
219 static bool need_log = TRUE;
220
221 if (need_log) {
222 crm_warn("Could not find user and group IDs for user %s",
224 need_log = FALSE;
225 }
226 }
227
228 if (uid_client != 0) {
229 crm_trace("Giving group %u access to new IPC connection", gid_cluster);
230 /* Passing -1 to chown(2) means don't change */
231 qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
232 }
233
234 /* TODO: Do our own auth checking, return NULL if unauthorized */
235 client = client_from_connection(c, NULL, uid_client);
236
237 if ((uid_client == 0) || (uid_client == uid_cluster)) {
238 /* Remember when a connection came from root or hacluster */
240 }
241
242 crm_debug("New IPC client %s for PID %u with uid %d and gid %d",
243 client->id, client->pid, uid_client, gid_client);
244 return client;
245}
246
247static struct iovec *
248pcmk__new_ipc_event(void)
249{
250 return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec));
251}
252
258void
259pcmk_free_ipc_event(struct iovec *event)
260{
261 if (event != NULL) {
262 free(event[0].iov_base);
263 free(event[1].iov_base);
264 free(event);
265 }
266}
267
268static void
269free_event(gpointer data)
270{
271 pcmk_free_ipc_event((struct iovec *) data);
272}
273
274static void
275add_event(pcmk__client_t *c, struct iovec *iov)
276{
277 if (c->event_queue == NULL) {
278 c->event_queue = g_queue_new();
279 }
280 g_queue_push_tail(c->event_queue, iov);
281}
282
283void
285{
286 if (c == NULL) {
287 return;
288 }
289
290 if (client_connections) {
291 if (c->ipcs) {
292 crm_trace("Destroying %p/%p (%d remaining)",
293 c, c->ipcs, g_hash_table_size(client_connections) - 1);
294 g_hash_table_remove(client_connections, c->ipcs);
295
296 } else {
297 crm_trace("Destroying remote connection %p (%d remaining)",
298 c, g_hash_table_size(client_connections) - 1);
299 g_hash_table_remove(client_connections, c->id);
300 }
301 }
302
303 if (c->event_timer) {
304 g_source_remove(c->event_timer);
305 }
306
307 if (c->event_queue) {
308 crm_debug("Destroying %d events", g_queue_get_length(c->event_queue));
309 g_queue_free_full(c->event_queue, free_event);
310 }
311
312 free(c->id);
313 free(c->name);
314 free(c->user);
315
316 if (c->buffer != NULL) {
317 g_byte_array_free(c->buffer, TRUE);
318 c->buffer = NULL;
319 }
320
321 if (c->remote) {
322 if (c->remote->auth_timeout) {
323 g_source_remove(c->remote->auth_timeout);
324 }
325 if (c->remote->tls_session != NULL) {
326 /* @TODO Reduce duplication at callers. Put here everything
327 * necessary to tear down and free tls_session.
328 */
329 gnutls_deinit(c->remote->tls_session);
330 }
331 free(c->remote->buffer);
332 free(c->remote);
333 }
334 free(c);
335}
336
344void
346{
347 int rc = pcmk_rc_ok;
348 long long qmax_ll = 0LL;
349 unsigned int orig_value = 0U;
350
351 CRM_CHECK(client != NULL, return);
352
353 orig_value = client->queue_max;
354
356 rc = pcmk__scan_ll(qmax, &qmax_ll, 0LL);
357 if (rc == pcmk_rc_ok) {
358 if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) {
359 rc = ERANGE;
360 } else {
361 client->queue_max = (unsigned int) qmax_ll;
362 }
363 }
364 } else {
365 rc = EACCES;
366 }
367
368 if (rc != pcmk_rc_ok) {
369 crm_info("Could not set IPC threshold for client %s[%u] to %s: %s",
370 pcmk__client_name(client), client->pid,
371 pcmk__s(qmax, "default"), pcmk_rc_str(rc));
372
373 } else if (client->queue_max != orig_value) {
374 crm_debug("IPC threshold for client %s[%u] is now %u (was %u)",
375 pcmk__client_name(client), client->pid,
376 client->queue_max, orig_value);
377 }
378}
379
380int
381pcmk__client_pid(qb_ipcs_connection_t *c)
382{
383 struct qb_ipcs_connection_stats stats;
384
385 stats.client_pid = 0;
386 qb_ipcs_connection_stats_get(c, &stats, 0);
387 return stats.client_pid;
388}
389
400xmlNode *
401pcmk__client_data2xml(pcmk__client_t *c, uint32_t *id, uint32_t *flags)
402{
403 xmlNode *xml = NULL;
404 pcmk__ipc_header_t *header = (void *) c->buffer->data;
405 char *text = (char *) header + sizeof(pcmk__ipc_header_t);
406
407 if (!pcmk__valid_ipc_header(header)) {
408 return NULL;
409 }
410
411 if (id) {
412 *id = header->qb.id;
413 }
414
415 if (flags) {
416 *flags = header->flags;
417 }
418
419 if (pcmk_is_set(header->flags, crm_ipc_proxied)) {
420 /* Mark this client as being the endpoint of a proxy connection.
421 * Proxy connections responses are sent on the event channel, to avoid
422 * blocking the controller serving as proxy.
423 */
425 }
426
427 pcmk__assert(text[header->size - 1] == 0);
428
429 xml = pcmk__xml_parse(text);
430 crm_log_xml_trace(xml, "[IPC received]");
431 return xml;
432}
433
434static int crm_ipcs_flush_events(pcmk__client_t *c);
435
436static gboolean
437crm_ipcs_flush_events_cb(gpointer data)
438{
439 pcmk__client_t *c = data;
440
441 c->event_timer = 0;
442 crm_ipcs_flush_events(c);
443 return FALSE;
444}
445
453static inline void
454delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
455{
456 /* Delay a maximum of 1.5 seconds */
457 guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
458
459 c->event_timer = pcmk__create_timer(delay, crm_ipcs_flush_events_cb, c);
460}
461
470static int
471crm_ipcs_flush_events(pcmk__client_t *c)
472{
473 int rc = pcmk_rc_ok;
474 ssize_t qb_rc = 0;
475 unsigned int sent = 0;
476 unsigned int queue_len = 0;
477
478 if (c == NULL) {
479 return rc;
480
481 } else if (c->event_timer) {
482 /* There is already a timer, wait until it goes off */
483 crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer);
484 return rc;
485 }
486
487 if (c->event_queue) {
488 queue_len = g_queue_get_length(c->event_queue);
489 }
490 while (sent < 100) {
491 pcmk__ipc_header_t *header = NULL;
492 struct iovec *event = NULL;
493
494 if (c->event_queue) {
495 // We don't pop unless send is successful
496 event = g_queue_peek_head(c->event_queue);
497 }
498 if (event == NULL) { // Queue is empty
499 break;
500 }
501
502 /* Retry sending the event up to five times. If we get -EAGAIN, sleep
503 * a very short amount of time (too long here is bad) and try again.
504 * If we simply exit the while loop on -EAGAIN, we'll have to wait until
505 * the timer fires off again (up to 1.5 seconds - see delay_next_flush)
506 * to retry sending the message.
507 *
508 * In that case, the queue may just continue to grow faster than we are
509 * processing it, eventually leading to daemons timing out waiting for
510 * replies, which will cause wider failures.
511 */
512 for (unsigned int retries = 5; retries > 0; retries--) {
513 qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
514
515 if (qb_rc < 0) {
516 if (retries == 1 || qb_rc != -EAGAIN) {
517 rc = (int) -qb_rc;
518 goto no_more_retries;
519 } else {
521 }
522 } else {
523 break;
524 }
525 }
526
527 event = g_queue_pop_head(c->event_queue);
528
529 sent++;
530 header = event[0].iov_base;
531 crm_trace("Event %" PRId32 " to %p[%u] (%zd bytes) sent: %.120s",
532 header->qb.id, c->ipcs, c->pid, qb_rc,
533 (char *) (event[1].iov_base));
534 pcmk_free_ipc_event(event);
535 }
536
537no_more_retries:
538 queue_len -= sent;
539 if (sent > 0 || queue_len) {
540 crm_trace("Sent %u events (%u remaining) for %p[%d]: %s (%zd)",
541 sent, queue_len, c->ipcs, c->pid, pcmk_rc_str(rc), qb_rc);
542 }
543
544 if (queue_len) {
545
546 /* Allow clients to briefly fall behind on processing incoming messages,
547 * but drop completely unresponsive clients so the connection doesn't
548 * consume resources indefinitely.
549 */
550 if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
551 if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
552 /* Don't evict for a new or shrinking backlog */
553 crm_warn("Client with process ID %u has a backlog of %u messages "
554 QB_XS " %p", c->pid, queue_len, c->ipcs);
555 } else {
556 crm_err("Evicting client with process ID %u due to backlog of %u messages "
557 QB_XS " %p", c->pid, queue_len, c->ipcs);
558 c->queue_backlog = 0;
559 qb_ipcs_disconnect(c->ipcs);
560 return rc;
561 }
562 }
563
564 c->queue_backlog = queue_len;
565 delay_next_flush(c, queue_len);
566
567 } else {
568 /* Event queue is empty, there is no backlog */
569 c->queue_backlog = 0;
570 }
571
572 return rc;
573}
574
594int
595pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index,
596 struct iovec **result, ssize_t *bytes)
597{
598 struct iovec *iov = NULL;
599 unsigned int payload_size = 0;
600 unsigned int total = 0;
601 unsigned int max_send_size = crm_ipc_default_buffer_size();
602 unsigned int max_chunk_size = 0;
603 size_t offset = 0;
604 pcmk__ipc_header_t *header = NULL;
605 int rc = pcmk_rc_ok;
606
607 if ((message == NULL) || (result == NULL)) {
608 rc = EINVAL;
609 goto done;
610 }
611
612 header = calloc(1, sizeof(pcmk__ipc_header_t));
613 if (header == NULL) {
614 rc = ENOMEM;
615 goto done;
616 }
617
618 *result = NULL;
619 iov = pcmk__new_ipc_event();
620 iov[0].iov_len = sizeof(pcmk__ipc_header_t);
621 iov[0].iov_base = header;
622
623 header->version = PCMK__IPC_VERSION;
624
625 /* We are passed an index, which is basically how many times this function
626 * has been called. This is how we support multi-part IPC messages. We
627 * need to convert that into an offset into the buffer that we want to start
628 * reading from.
629 *
630 * Each call to this function can send max_send_size, but this also includes
631 * the header and a null terminator character for the end of the payload.
632 * We need to subtract those out here.
633 */
634 max_chunk_size = max_send_size - iov[0].iov_len - 1;
635 offset = index * max_chunk_size;
636
637 /* How much of message is left to send? This does not include the null
638 * terminator character.
639 */
640 payload_size = message->len - offset;
641
642 /* How much would be transmitted, including the header size and null
643 * terminator character for the buffer?
644 */
645 total = iov[0].iov_len + payload_size + 1;
646
647 if (total >= max_send_size) {
648 /* The entire packet is too big to fit in a single buffer. Calculate
649 * how much of it we can send - buffer size, minus header size, minus
650 * one for the null terminator.
651 */
652 payload_size = max_chunk_size;
653
654 header->size = payload_size + 1;
655
656 iov[1].iov_base = strndup(message->str + offset, payload_size);
657 if (iov[1].iov_base == NULL) {
658 rc = ENOMEM;
659 goto done;
660 }
661
662 iov[1].iov_len = header->size;
663 rc = pcmk_rc_ipc_more;
664
665 } else {
666 /* The entire packet fits in a single buffer. We can copy the entirety
667 * of it into the payload.
668 */
669 header->size = payload_size + 1;
670
671 iov[1].iov_base = pcmk__str_copy(message->str + offset);
672 iov[1].iov_len = header->size;
673 }
674
675 header->part_id = index;
676 header->qb.size = iov[0].iov_len + iov[1].iov_len;
677 header->qb.id = (int32_t)request; /* Replying to a specific request */
678
679 if ((rc == pcmk_rc_ok) && (index != 0)) {
680 pcmk__set_ipc_flags(header->flags, "multipart ipc",
682 } else if (rc == pcmk_rc_ipc_more) {
683 pcmk__set_ipc_flags(header->flags, "multipart ipc",
685 }
686
687 *result = iov;
688 pcmk__assert(header->qb.size > 0);
689 if (bytes != NULL) {
690 *bytes = header->qb.size;
691 }
692
693done:
694 if ((rc != pcmk_rc_ok) && (rc != pcmk_rc_ipc_more)) {
696 }
697
698 return rc;
699}
700
701/* Return the next available ID for a server event.
702 *
703 * For the parts of a multipart event, all parts should have the same ID as
704 * the first part.
705 */
706static uint32_t
707id_for_server_event(pcmk__ipc_header_t *header)
708{
709 static uint32_t id = 1;
710
711 if (pcmk_is_set(header->flags, crm_ipc_multipart) && (header->part_id != 0)) {
712 return id;
713 } else {
714 id++;
715 return id;
716 }
717}
718
719int
720pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
721{
722 int rc = pcmk_rc_ok;
723 pcmk__ipc_header_t *header = iov[0].iov_base;
724
725 /* _ALL_ replies to proxied connections need to be sent as events */
728 /* The proxied flag lets us know this was originally meant to be a
729 * response, even though we're sending it over the event channel.
730 */
731 pcmk__set_ipc_flags(flags, "server event",
733 }
734
735 pcmk__set_ipc_flags(header->flags, "server event", flags);
737 /* Server events don't use an ID, though we do set one in
738 * pcmk__ipc_prepare_iov if the event is in response to a particular
739 * request. In that case, we don't want to set a new ID here that
740 * overwrites that one.
741 *
742 * @TODO: Since server event IDs aren't used anywhere, do we really
743 * need to set this for any reason other than ease of logging?
744 */
745 if (header->qb.id == 0) {
746 header->qb.id = id_for_server_event(header);
747 }
748
750 crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid);
751 add_event(c, iov);
752
753 } else {
754 struct iovec *iov_copy = pcmk__new_ipc_event();
755
756 crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
757 iov_copy[0].iov_len = iov[0].iov_len;
758 iov_copy[0].iov_base = malloc(iov[0].iov_len);
759 memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
760
761 iov_copy[1].iov_len = iov[1].iov_len;
762 iov_copy[1].iov_base = malloc(iov[1].iov_len);
763 memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
764
765 add_event(c, iov_copy);
766 }
767
768 rc = crm_ipcs_flush_events(c);
769
770 } else {
771 ssize_t qb_rc;
772 char *part_text = NULL;
773
774 CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */
775
777 part_text = crm_strdup_printf(" (final part %d) ", header->part_id);
778 } else if (pcmk_is_set(header->flags, crm_ipc_multipart)) {
779 if (header->part_id == 0) {
780 part_text = crm_strdup_printf(" (initial part %d) ", header->part_id);
781 } else {
782 part_text = crm_strdup_printf(" (part %d) ", header->part_id);
783 }
784 } else {
785 part_text = crm_strdup_printf(" ");
786 }
787
788 qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
789 if (qb_rc < header->qb.size) {
790 if (qb_rc < 0) {
791 rc = (int) -qb_rc;
792 }
793
794 crm_notice("Response %" PRId32 "%sto pid %u failed: %s "
795 QB_XS " bytes=%" PRId32 " rc=%zd ipcs=%p",
796 header->qb.id, part_text, c->pid, pcmk_rc_str(rc),
797 header->qb.size, qb_rc, c->ipcs);
798
799 } else {
800 crm_trace("Response %" PRId32 "%ssent, %zd bytes to %p[%u]",
801 header->qb.id, part_text, qb_rc, c->ipcs, c->pid);
802 crm_trace("Text = %s", (char *) iov[1].iov_base);
803 }
804
805 free(part_text);
806
809 }
810
811 crm_ipcs_flush_events(c);
812 }
813
814 if ((rc == EPIPE) || (rc == ENOTCONN)) {
815 crm_trace("Client %p disconnected", c->ipcs);
816 }
817
818 return rc;
819}
820
821int
822pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message,
823 uint32_t flags)
824{
825 struct iovec *iov = NULL;
826 int rc = pcmk_rc_ok;
827 GString *iov_buffer = NULL;
828 uint16_t index = 0;
829 bool event_or_proxied = false;
830
831 if (c == NULL) {
832 return EINVAL;
833 }
834
835 iov_buffer = g_string_sized_new(1024);
836 pcmk__xml_string(message, 0, iov_buffer, 0);
837
838 /* Testing crm_ipc_server_event is obvious. pcmk__client_proxied is less
839 * obvious. According to pcmk__ipc_send_iov, replies to proxied connections
840 * need to be sent as events. However, do_local_notify (which calls this
841 * function) will clear all flags so we can't go just by crm_ipc_server_event.
842 *
843 * Changing do_local_notify to check for a proxied connection first results
844 * in processes on the Pacemaker Remote node (like cibadmin or crm_mon)
845 * timing out when waiting for a reply.
846 */
847 event_or_proxied = pcmk_is_set(flags, crm_ipc_server_event)
849
850 do {
851 rc = pcmk__ipc_prepare_iov(request, iov_buffer, index, &iov, NULL);
852
853 switch (rc) {
854 case pcmk_rc_ok:
855 /* No more chunks to send after this one */
857 rc = pcmk__ipc_send_iov(c, iov, flags);
858
859 if (event_or_proxied) {
860 if (rc == EAGAIN) {
861 /* Return pcmk_rc_ok instead so callers don't have to know
862 * whether they passed an event or not when interpreting
863 * the return code.
864 */
865 rc = pcmk_rc_ok;
866 }
867 } else {
868 /* EAGAIN is an error for IPC messages. We don't have a
869 * send queue for these, so we need to try again. If there
870 * was some other error, we need to break out of this loop
871 * and report it.
872 *
873 * FIXME: Retry limit for EAGAIN?
874 */
875 if (rc == EAGAIN) {
876 break;
877 }
878 }
879
880 goto done;
881
882 case pcmk_rc_ipc_more:
883 /* There are more chunks to send after this one */
885 rc = pcmk__ipc_send_iov(c, iov, flags);
886
887 /* Did an error occur during transmission? */
888 if (event_or_proxied) {
889 /* EAGAIN is not an error for server events. The event
890 * will be queued for transmission and we will attempt
891 * sending it again the next time pcmk__ipc_send_iov is
892 * called, or when the crm_ipcs_flush_events_cb happens.
893 */
894 if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
895 goto done;
896 }
897
898 index++;
899 break;
900
901 } else {
902 /* EAGAIN is an error for IPC messages. We don't have a
903 * send queue for these, so we need to try again. If there
904 * was some other error, we need to break out of this loop
905 * and report it.
906 *
907 * FIXME: Retry limit for EAGAIN?
908 */
909 if (rc == pcmk_rc_ok) {
910 index++;
911 break;
912 } else if (rc == EAGAIN) {
913 break;
914 } else {
915 goto done;
916 }
917 }
918
919 default:
920 /* An error occurred during preparation */
921 goto done;
922 }
923 } while (true);
924
925done:
926 if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
927 crm_notice("IPC message to pid %u failed: %s " QB_XS " rc=%d",
928 c->pid, pcmk_rc_str(rc), rc);
929 }
930
931 g_string_free(iov_buffer, TRUE);
932 return rc;
933}
934
951xmlNode *
952pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
953 const char *tag, const char *ver, crm_exit_t status)
954{
955 xmlNode *ack = NULL;
956
958 ack = pcmk__xe_create(NULL, tag);
959 crm_xml_add(ack, PCMK_XA_FUNCTION, function);
960 crm_xml_add_int(ack, PCMK__XA_LINE, line);
961 crm_xml_add_int(ack, PCMK_XA_STATUS, (int) status);
963 }
964 return ack;
965}
966
982int
983pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
984 uint32_t request, uint32_t flags, const char *tag,
985 const char *ver, crm_exit_t status)
986{
987 int rc = pcmk_rc_ok;
988 xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, tag, ver, status);
989
990 if (ack != NULL) {
991 crm_trace("Ack'ing IPC message from client %s as <%s status=%d>",
992 pcmk__client_name(c), tag, status);
993 crm_log_xml_trace(ack, "sent-ack");
994 c->request_id = 0;
995 rc = pcmk__ipc_send_xml(c, request, ack, flags);
996 pcmk__xml_free(ack);
997 }
998 return rc;
999}
1000
1015void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
1016 qb_ipcs_service_t **ipcs_rw,
1017 qb_ipcs_service_t **ipcs_shm,
1018 struct qb_ipcs_service_handlers *ro_cb,
1019 struct qb_ipcs_service_handlers *rw_cb)
1020{
1022 QB_IPC_NATIVE, ro_cb);
1023
1025 QB_IPC_NATIVE, rw_cb);
1026
1028 QB_IPC_SHM, rw_cb);
1029
1030 if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
1031 crm_err("Failed to create the CIB manager: exiting and inhibiting respawn");
1032 crm_warn("Verify pacemaker and pacemaker_remote are not both enabled");
1034 }
1035}
1036
1048void
1049pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
1050 qb_ipcs_service_t *ipcs_rw,
1051 qb_ipcs_service_t *ipcs_shm)
1052{
1053 qb_ipcs_destroy(ipcs_ro);
1054 qb_ipcs_destroy(ipcs_rw);
1055 qb_ipcs_destroy(ipcs_shm);
1056}
1057
1066qb_ipcs_service_t *
1067pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
1068{
1069 return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
1070}
1071
1081void
1082pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
1083 struct qb_ipcs_service_handlers *cb)
1084{
1085 *ipcs = mainloop_add_ipc_server(PCMK__VALUE_ATTRD, QB_IPC_NATIVE, cb);
1086
1087 if (*ipcs == NULL) {
1088 crm_crit("Exiting fatally because unable to serve " PCMK__SERVER_ATTRD
1089 " IPC (verify pacemaker and pacemaker_remote are not both "
1090 "enabled)");
1092 }
1093}
1094
1104void
1105pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
1106 struct qb_ipcs_service_handlers *cb)
1107{
1108 *ipcs = mainloop_add_ipc_server_with_prio("stonith-ng", QB_IPC_NATIVE, cb,
1109 QB_LOOP_HIGH);
1110
1111 if (*ipcs == NULL) {
1112 crm_err("Failed to create fencer: exiting and inhibiting respawn.");
1113 crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
1115 }
1116}
1117
1127void
1128pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
1129 struct qb_ipcs_service_handlers *cb)
1130{
1131 *ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, cb);
1132
1133 if (*ipcs == NULL) {
1134 crm_err("Couldn't start pacemakerd IPC server");
1135 crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
1136 /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
1137 * if we want to prevent pacemakerd from restarting them.
1138 * With pacemakerd we leave the exit-code shown to e.g. systemd
1139 * to what it was prior to moving the code here from pacemakerd.c
1140 */
1142 }
1143}
1144
1154qb_ipcs_service_t *
1155pcmk__serve_schedulerd_ipc(struct qb_ipcs_service_handlers *cb)
1156{
1157 return mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_NATIVE, cb);
1158}
char * pcmk__uid2username(uid_t uid)
Definition acl.c:823
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
Definition utils.c:405
void pcmk__sleep_ms(unsigned int ms)
Definition utils.c:359
#define pcmk__assert_alloc(nmemb, size)
Definition internal.h:246
uint64_t flags
Definition remote.c:3
int pcmk_daemon_user(uid_t *uid, gid_t *gid)
Get user and group IDs of pacemaker daemon user.
Definition utils.c:143
char * crm_generate_uuid(void)
Definition utils.c:339
#define pcmk_is_set(g, f)
Convenience alias for pcmk_all_flags_set(), to check single flag.
Definition util.h:80
#define CRM_DAEMON_USER
Definition config.h:27
char data[0]
Definition cpg.c:10
uint32_t id
Definition cpg.c:0
A dumping ground.
#define CRM_SYSTEM_CRMD
Definition crm.h:84
#define CRM_SYSTEM_MCP
Definition crm.h:88
#define CRM_SYSTEM_PENGINE
Definition crm.h:86
#define PCMK__SERVER_BASED_RO
#define PCMK__SERVER_BASED_RW
#define PCMK__SERVER_BASED_SHM
struct pcmk__ipc_header_s pcmk__ipc_header_t
G_GNUC_INTERNAL bool pcmk__valid_ipc_header(const pcmk__ipc_header_t *header)
Definition ipc_common.c:45
#define PCMK__IPC_VERSION
IPC interface to Pacemaker daemons.
unsigned int crm_ipc_default_buffer_size(void)
Return pacemaker's IPC buffer size.
Definition ipc_common.c:31
@ crm_ipc_proxied_relay_response
Definition ipc.h:152
@ crm_ipc_multipart
This is a multi-part IPC message.
Definition ipc.h:154
@ crm_ipc_server_event
Send an Event instead of a Response.
Definition ipc.h:147
@ crm_ipc_client_response
A response is expected in reply.
Definition ipc.h:142
@ crm_ipc_proxied
ALL replies to proxied connections need to be sent as events
Definition ipc.h:140
@ crm_ipc_server_free
Free the iovec after sending.
Definition ipc.h:149
@ crm_ipc_multipart_end
This is the end of a multi-part IPC message.
Definition ipc.h:156
#define pcmk__set_client_flags(client, flags_to_set)
#define pcmk__set_ipc_flags(ipc_flags, ipc_name, flags_to_set)
@ pcmk__client_proxied
Client IPC is proxied.
@ pcmk__client_ipc
Client uses plain IPC.
@ pcmk__client_privileged
Client is run by root or cluster user.
pcmk__client_t * pcmk__find_client_by_id(const char *id)
Definition ipc_server.c:70
const char * pcmk__client_name(const pcmk__client_t *c)
Definition ipc_server.c:98
int pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index, struct iovec **result, ssize_t *bytes)
Definition ipc_server.c:595
pcmk__client_t * pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
Definition ipc_server.c:209
int pcmk__client_pid(qb_ipcs_connection_t *c)
Definition ipc_server.c:381
#define PCMK_IPC_DEFAULT_QUEUE_MAX
Definition ipc_server.c:25
pcmk__client_t * pcmk__find_client(const qb_ipcs_connection_t *c)
Definition ipc_server.c:59
int pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c, uint32_t request, uint32_t flags, const char *tag, const char *ver, crm_exit_t status)
Definition ipc_server.c:983
pcmk__client_t * pcmk__new_unauth_client(void *key)
Allocate a new pcmk__client_t object and generate its ID.
Definition ipc_server.c:203
void pcmk_free_ipc_event(struct iovec *event)
Free an I/O vector created by pcmk__ipc_prepare_iov()
Definition ipc_server.c:259
guint pcmk__ipc_client_count(void)
Definition ipc_server.c:36
void pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb)
int pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, uint32_t flags)
Definition ipc_server.c:822
void pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
Definition ipc_server.c:345
qb_ipcs_service_t * pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
void pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro, qb_ipcs_service_t *ipcs_rw, qb_ipcs_service_t *ipcs_shm)
void pcmk__free_client(pcmk__client_t *c)
Definition ipc_server.c:284
xmlNode * pcmk__client_data2xml(pcmk__client_t *c, uint32_t *id, uint32_t *flags)
Definition ipc_server.c:401
void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro, qb_ipcs_service_t **ipcs_rw, qb_ipcs_service_t **ipcs_shm, struct qb_ipcs_service_handlers *ro_cb, struct qb_ipcs_service_handlers *rw_cb)
xmlNode * pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags, const char *tag, const char *ver, crm_exit_t status)
Definition ipc_server.c:952
void pcmk__client_cleanup(void)
Definition ipc_server.c:115
void pcmk__drop_all_clients(qb_ipcs_service_t *service)
Definition ipc_server.c:130
void pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
Definition ipc_server.c:51
int pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
Definition ipc_server.c:720
void pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb)
qb_ipcs_service_t * pcmk__serve_schedulerd_ipc(struct qb_ipcs_service_handlers *cb)
void pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb)
#define crm_info(fmt, args...)
Definition logging.h:365
#define crm_warn(fmt, args...)
Definition logging.h:360
#define crm_crit(fmt, args...)
Definition logging.h:354
#define CRM_LOG_ASSERT(expr)
Definition logging.h:196
#define crm_notice(fmt, args...)
Definition logging.h:363
#define CRM_CHECK(expr, failure_action)
Definition logging.h:213
#define crm_debug(fmt, args...)
Definition logging.h:368
#define crm_err(fmt, args...)
Definition logging.h:357
#define crm_log_xml_trace(xml, text)
Definition logging.h:378
#define crm_trace(fmt, args...)
Definition logging.h:370
qb_ipcs_service_t * mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks, enum qb_loop_priority prio)
Start server-side API end-point, hooked into the internal event loop.
Definition mainloop.c:644
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Definition mainloop.c:637
#define PCMK__VALUE_ATTRD
int delay
Definition pcmk_fence.c:36
pcmk__action_result_t result
Definition pcmk_fence.c:37
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition results.c:617
@ CRM_EX_OSERR
External (OS/environmental) problem.
Definition results.h:254
@ CRM_EX_FATAL
Do not respawn.
Definition results.h:264
_Noreturn crm_exit_t crm_exit(crm_exit_t rc)
Definition results.c:1058
@ pcmk_rc_ipc_more
Definition results.h:113
@ pcmk_rc_ok
Definition results.h:159
enum crm_exit_e crm_exit_t
Exit status codes for tools and daemons.
#define pcmk__assert(expr)
#define PCMK__SERVER_ATTRD
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define pcmk__plural_s(i)
int pcmk__scan_ll(const char *text, long long *result, long long default_value)
Definition strings.c:92
#define pcmk__str_copy(str)
qb_ipcs_connection_t * ipcs
GByteArray * buffer
unsigned int queue_backlog
unsigned int pid
struct pcmk__remote_s * remote
GQueue * event_queue
unsigned int queue_max
struct qb_ipc_response_header qb
gnutls_session_t tls_session
Wrappers for and extensions to libxml2.
const char * crm_xml_add_int(xmlNode *node, const char *name, int value)
Create an XML attribute with specified name and integer value.
const char * crm_xml_add(xmlNode *node, const char *name, const char *value)
Create an XML attribute with specified name and value.
xmlNode * pcmk__xe_create(xmlNode *parent, const char *name)
void pcmk__xml_free(xmlNode *xml)
Definition xml.c:816
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition xml_io.c:370
xmlNode * pcmk__xml_parse(const char *input)
Definition xml_io.c:167
#define PCMK_XA_FUNCTION
Definition xml_names.h:292
#define PCMK_XA_STATUS
Definition xml_names.h:410
#define PCMK__XA_LINE
#define PCMK__XA_IPC_PROTO_VERSION