25#define PCMK_IPC_DEFAULT_QUEUE_MAX 500
27static GHashTable *client_connections = NULL;
38 return client_connections? g_hash_table_size(client_connections) : 0;
53 if ((func != NULL) && (client_connections != NULL)) {
54 g_hash_table_foreach(client_connections, func, user_data);
61 if (client_connections) {
62 return g_hash_table_lookup(client_connections, c);
72 if ((client_connections != NULL) && (
id != NULL)) {
77 g_hash_table_iter_init(&iter, client_connections);
78 while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
79 if (strcmp(client->
id,
id) == 0) {
84 crm_trace(
"No client found with id='%s'", pcmk__s(
id,
""));
101 return "(unspecified)";
103 }
else if (c->
name != NULL) {
106 }
else if (c->
id != NULL) {
110 return "(unidentified)";
117 if (client_connections != NULL) {
118 int active = g_hash_table_size(client_connections);
121 crm_warn(
"Exiting with %d active IPC client%s",
124 g_hash_table_destroy(client_connections);
125 client_connections = NULL;
132 qb_ipcs_connection_t *c = NULL;
134 if (service == NULL) {
138 c = qb_ipcs_connection_first_get(service);
141 qb_ipcs_connection_t *last = c;
143 c = qb_ipcs_connection_next_get(service, last);
146 crm_notice(
"Disconnecting client %p, pid=%d...",
148 qb_ipcs_disconnect(last);
149 qb_ipcs_connection_unref(last);
164client_from_connection(qb_ipcs_connection_t *c,
void *key, uid_t uid_client)
170 if (client->
user == NULL) {
172 crm_err(
"Unable to enforce ACLs for user ID %d, assuming unprivileged",
187 if (client_connections == NULL) {
189 client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
191 g_hash_table_insert(client_connections, key, client);
205 return client_from_connection(NULL, key, 0);
211 gid_t uid_cluster = 0;
212 gid_t gid_cluster = 0;
219 static bool need_log = TRUE;
222 crm_warn(
"Could not find user and group IDs for user %s",
228 if (uid_client != 0) {
229 crm_trace(
"Giving group %u access to new IPC connection", gid_cluster);
231 qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
235 client = client_from_connection(c, NULL, uid_client);
237 if ((uid_client == 0) || (uid_client == uid_cluster)) {
242 crm_debug(
"New IPC client %s for PID %u with uid %d and gid %d",
243 client->
id, client->
pid, uid_client, gid_client);
248pcmk__new_ipc_event(
void)
262 free(event[0].iov_base);
263 free(event[1].iov_base);
269free_event(gpointer
data)
290 if (client_connections) {
292 crm_trace(
"Destroying %p/%p (%d remaining)",
293 c, c->
ipcs, g_hash_table_size(client_connections) - 1);
294 g_hash_table_remove(client_connections, c->
ipcs);
297 crm_trace(
"Destroying remote connection %p (%d remaining)",
298 c, g_hash_table_size(client_connections) - 1);
299 g_hash_table_remove(client_connections, c->
id);
317 g_byte_array_free(c->
buffer, TRUE);
348 long long qmax_ll = 0LL;
349 unsigned int orig_value = 0U;
358 if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) {
361 client->
queue_max = (
unsigned int) qmax_ll;
369 crm_info(
"Could not set IPC threshold for client %s[%u] to %s: %s",
373 }
else if (client->
queue_max != orig_value) {
374 crm_debug(
"IPC threshold for client %s[%u] is now %u (was %u)",
383 struct qb_ipcs_connection_stats stats;
385 stats.client_pid = 0;
386 qb_ipcs_connection_stats_get(c, &stats, 0);
387 return stats.client_pid;
437crm_ipcs_flush_events_cb(gpointer
data)
442 crm_ipcs_flush_events(c);
457 guint
delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
475 unsigned int sent = 0;
476 unsigned int queue_len = 0;
492 struct iovec *
event = NULL;
512 for (
unsigned int retries = 5; retries > 0; retries--) {
513 qb_rc = qb_ipcs_event_sendv(c->
ipcs, event, 2);
516 if (retries == 1 || qb_rc != -EAGAIN) {
518 goto no_more_retries;
530 header =
event[0].iov_base;
531 crm_trace(
"Event %" PRId32
" to %p[%u] (%zd bytes) sent: %.120s",
533 (
char *) (event[1].iov_base));
539 if (sent > 0 || queue_len) {
540 crm_trace(
"Sent %u events (%u remaining) for %p[%d]: %s (%zd)",
551 if ((c->
queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
553 crm_warn(
"Client with process ID %u has a backlog of %u messages "
554 QB_XS
" %p", c->
pid, queue_len, c->
ipcs);
556 crm_err(
"Evicting client with process ID %u due to backlog of %u messages "
557 QB_XS
" %p", c->
pid, queue_len, c->
ipcs);
559 qb_ipcs_disconnect(c->
ipcs);
565 delay_next_flush(c, queue_len);
596 struct iovec **
result, ssize_t *bytes)
598 struct iovec *iov = NULL;
599 unsigned int payload_size = 0;
600 unsigned int total = 0;
602 unsigned int max_chunk_size = 0;
607 if ((message == NULL) || (
result == NULL)) {
613 if (header == NULL) {
619 iov = pcmk__new_ipc_event();
621 iov[0].iov_base = header;
634 max_chunk_size = max_send_size - iov[0].iov_len - 1;
635 offset = index * max_chunk_size;
640 payload_size = message->len - offset;
645 total = iov[0].iov_len + payload_size + 1;
647 if (total >= max_send_size) {
652 payload_size = max_chunk_size;
654 header->
size = payload_size + 1;
656 iov[1].iov_base = strndup(message->str + offset, payload_size);
657 if (iov[1].iov_base == NULL) {
662 iov[1].iov_len = header->
size;
669 header->
size = payload_size + 1;
672 iov[1].iov_len = header->
size;
676 header->
qb.size = iov[0].iov_len + iov[1].iov_len;
677 header->
qb.id = (int32_t)request;
690 *bytes = header->
qb.size;
709 static uint32_t
id = 1;
745 if (header->
qb.id == 0) {
746 header->
qb.id = id_for_server_event(header);
754 struct iovec *iov_copy = pcmk__new_ipc_event();
757 iov_copy[0].iov_len = iov[0].iov_len;
758 iov_copy[0].iov_base = malloc(iov[0].iov_len);
759 memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
761 iov_copy[1].iov_len = iov[1].iov_len;
762 iov_copy[1].iov_base = malloc(iov[1].iov_len);
763 memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
765 add_event(c, iov_copy);
768 rc = crm_ipcs_flush_events(c);
772 char *part_text = NULL;
788 qb_rc = qb_ipcs_response_sendv(c->
ipcs, iov, 2);
789 if (qb_rc < header->qb.size) {
794 crm_notice(
"Response %" PRId32
"%sto pid %u failed: %s "
795 QB_XS
" bytes=%" PRId32
" rc=%zd ipcs=%p",
797 header->
qb.size, qb_rc, c->
ipcs);
800 crm_trace(
"Response %" PRId32
"%ssent, %zd bytes to %p[%u]",
801 header->
qb.id, part_text, qb_rc, c->
ipcs, c->
pid);
802 crm_trace(
"Text = %s", (
char *) iov[1].iov_base);
811 crm_ipcs_flush_events(c);
814 if ((rc == EPIPE) || (rc == ENOTCONN)) {
825 struct iovec *iov = NULL;
827 GString *iov_buffer = NULL;
829 bool event_or_proxied =
false;
835 iov_buffer = g_string_sized_new(1024);
859 if (event_or_proxied) {
888 if (event_or_proxied) {
912 }
else if (rc == EAGAIN) {
927 crm_notice(
"IPC message to pid %u failed: %s " QB_XS
" rc=%d",
931 g_string_free(iov_buffer, TRUE);
953 const char *tag,
const char *ver,
crm_exit_t status)
984 uint32_t request, uint32_t
flags,
const char *tag,
991 crm_trace(
"Ack'ing IPC message from client %s as <%s status=%d>",
1016 qb_ipcs_service_t **ipcs_rw,
1017 qb_ipcs_service_t **ipcs_shm,
1018 struct qb_ipcs_service_handlers *ro_cb,
1019 struct qb_ipcs_service_handlers *rw_cb)
1022 QB_IPC_NATIVE, ro_cb);
1025 QB_IPC_NATIVE, rw_cb);
1030 if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
1031 crm_err(
"Failed to create the CIB manager: exiting and inhibiting respawn");
1032 crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled");
1050 qb_ipcs_service_t *ipcs_rw,
1051 qb_ipcs_service_t *ipcs_shm)
1053 qb_ipcs_destroy(ipcs_ro);
1054 qb_ipcs_destroy(ipcs_rw);
1055 qb_ipcs_destroy(ipcs_shm);
1083 struct qb_ipcs_service_handlers *cb)
1087 if (*ipcs == NULL) {
1089 " IPC (verify pacemaker and pacemaker_remote are not both "
1106 struct qb_ipcs_service_handlers *cb)
1111 if (*ipcs == NULL) {
1112 crm_err(
"Failed to create fencer: exiting and inhibiting respawn.");
1113 crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled.");
1129 struct qb_ipcs_service_handlers *cb)
1133 if (*ipcs == NULL) {
1134 crm_err(
"Couldn't start pacemakerd IPC server");
1135 crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled.");
char * pcmk__uid2username(uid_t uid)
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
void pcmk__sleep_ms(unsigned int ms)
#define pcmk__assert_alloc(nmemb, size)
int pcmk_daemon_user(uid_t *uid, gid_t *gid)
Get user and group IDs of pacemaker daemon user.
char * crm_generate_uuid(void)
#define pcmk_is_set(g, f)
Convenience alias for pcmk_all_flags_set(), to check single flag.
#define CRM_SYSTEM_PENGINE
#define PCMK__SERVER_BASED_RO
#define PCMK__SERVER_BASED_RW
#define PCMK__SERVER_BASED_SHM
struct pcmk__ipc_header_s pcmk__ipc_header_t
G_GNUC_INTERNAL bool pcmk__valid_ipc_header(const pcmk__ipc_header_t *header)
#define PCMK__IPC_VERSION
IPC interface to Pacemaker daemons.
unsigned int crm_ipc_default_buffer_size(void)
Return pacemaker's IPC buffer size.
@ crm_ipc_proxied_relay_response
@ crm_ipc_multipart
This is a multi-part IPC message.
@ crm_ipc_server_event
Send an Event instead of a Response.
@ crm_ipc_client_response
A response is expected in reply.
@ crm_ipc_proxied
ALL replies to proxied connections need to be sent as events
@ crm_ipc_server_free
Free the iovec after sending.
@ crm_ipc_multipart_end
This is the end of a multi-part IPC message.
#define pcmk__set_client_flags(client, flags_to_set)
#define pcmk__set_ipc_flags(ipc_flags, ipc_name, flags_to_set)
@ pcmk__client_proxied
Client IPC is proxied.
@ pcmk__client_ipc
Client uses plain IPC.
@ pcmk__client_privileged
Client is run by root or cluster user.
pcmk__client_t * pcmk__find_client_by_id(const char *id)
const char * pcmk__client_name(const pcmk__client_t *c)
int pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index, struct iovec **result, ssize_t *bytes)
pcmk__client_t * pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
int pcmk__client_pid(qb_ipcs_connection_t *c)
#define PCMK_IPC_DEFAULT_QUEUE_MAX
pcmk__client_t * pcmk__find_client(const qb_ipcs_connection_t *c)
int pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c, uint32_t request, uint32_t flags, const char *tag, const char *ver, crm_exit_t status)
pcmk__client_t * pcmk__new_unauth_client(void *key)
Allocate a new pcmk__client_t object and generate its ID.
void pcmk_free_ipc_event(struct iovec *event)
Free an I/O vector created by pcmk__ipc_prepare_iov()
guint pcmk__ipc_client_count(void)
void pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb)
int pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, uint32_t flags)
void pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
qb_ipcs_service_t * pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
void pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro, qb_ipcs_service_t *ipcs_rw, qb_ipcs_service_t *ipcs_shm)
void pcmk__free_client(pcmk__client_t *c)
xmlNode * pcmk__client_data2xml(pcmk__client_t *c, uint32_t *id, uint32_t *flags)
void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro, qb_ipcs_service_t **ipcs_rw, qb_ipcs_service_t **ipcs_shm, struct qb_ipcs_service_handlers *ro_cb, struct qb_ipcs_service_handlers *rw_cb)
xmlNode * pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags, const char *tag, const char *ver, crm_exit_t status)
void pcmk__client_cleanup(void)
void pcmk__drop_all_clients(qb_ipcs_service_t *service)
void pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
int pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
void pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb)
qb_ipcs_service_t * pcmk__serve_schedulerd_ipc(struct qb_ipcs_service_handlers *cb)
void pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb)
#define crm_info(fmt, args...)
#define crm_warn(fmt, args...)
#define crm_crit(fmt, args...)
#define CRM_LOG_ASSERT(expr)
#define crm_notice(fmt, args...)
#define CRM_CHECK(expr, failure_action)
#define crm_debug(fmt, args...)
#define crm_err(fmt, args...)
#define crm_log_xml_trace(xml, text)
#define crm_trace(fmt, args...)
qb_ipcs_service_t * mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks, enum qb_loop_priority prio)
Start server-side API end-point, hooked into the internal event loop.
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
#define PCMK__VALUE_ATTRD
pcmk__action_result_t result
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
@ CRM_EX_OSERR
External (OS/environmental) problem.
@ CRM_EX_FATAL
Do not respawn.
_Noreturn crm_exit_t crm_exit(crm_exit_t rc)
enum crm_exit_e crm_exit_t
Exit status codes for tools and daemons.
#define pcmk__assert(expr)
#define PCMK__SERVER_ATTRD
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define pcmk__plural_s(i)
int pcmk__scan_ll(const char *text, long long *result, long long default_value)
#define pcmk__str_copy(str)
qb_ipcs_connection_t * ipcs
unsigned int queue_backlog
struct pcmk__remote_s * remote
gnutls_session_t tls_session
Wrappers for and extensions to libxml2.
const char * crm_xml_add_int(xmlNode *node, const char *name, int value)
Create an XML attribute with specified name and integer value.
const char * crm_xml_add(xmlNode *node, const char *name, const char *value)
Create an XML attribute with specified name and value.
xmlNode * pcmk__xe_create(xmlNode *parent, const char *name)
void pcmk__xml_free(xmlNode *xml)
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
xmlNode * pcmk__xml_parse(const char *input)
#define PCMK__XA_IPC_PROTO_VERSION