16 #include <sys/types.h>    25 #define PCMK_IPC_DEFAULT_QUEUE_MAX 500    27 static GHashTable *client_connections = NULL;
    38     return client_connections? g_hash_table_size(client_connections) : 0;
    53     if ((func != NULL) && (client_connections != NULL)) {
    54         g_hash_table_foreach(client_connections, func, user_data);
    61     if (client_connections) {
    62         return g_hash_table_lookup(client_connections, c);
    76     if (client_connections && 
id) {
    77         g_hash_table_iter_init(&iter, client_connections);
    78         while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
    79             if (strcmp(client->id, 
id) == 0) {
    85     crm_trace(
"No client found with id=%s", 
id);
   102         return "(unspecified)";
   104     } 
else if (c->
name != NULL) {
   107     } 
else if (c->
id != NULL) {
   111         return "(unidentified)";
   118     if (client_connections != NULL) {
   119         int active = g_hash_table_size(client_connections);
   122             crm_err(
"Exiting with %d active IPC client%s",
   125         g_hash_table_destroy(client_connections); client_connections = NULL;
   132     qb_ipcs_connection_t *c = NULL;
   134     if (service == NULL) {
   138     c = qb_ipcs_connection_first_get(service);
   141         qb_ipcs_connection_t *last = c;
   143         c = qb_ipcs_connection_next_get(service, last);
   146         crm_notice(
"Disconnecting client %p, pid=%d...",
   148         qb_ipcs_disconnect(last);
   149         qb_ipcs_connection_unref(last);
   164 client_from_connection(qb_ipcs_connection_t *c, 
void *key, uid_t uid_client)
   168     if (client == NULL) {
   175         if (client->
user == NULL) {
   176             client->
user = strdup(
"#unprivileged");
   178             crm_err(
"Unable to enforce ACLs for user ID %d, assuming unprivileged",
   190     if (client->
id == NULL) {
   191         crm_err(
"Could not generate UUID for client");
   199     if (client_connections == NULL) {
   201         client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
   203     g_hash_table_insert(client_connections, key, client);
   226     gid_t uid_cluster = 0;
   227     gid_t gid_cluster = 0;
   234         static bool need_log = TRUE;
   237             crm_warn(
"Could not find user and group IDs for user %s",
   243     if (uid_client != 0) {
   244         crm_trace(
"Giving group %u access to new IPC connection", gid_cluster);
   246         qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
   250     client = client_from_connection(c, NULL, uid_client);
   251     if (client == NULL) {
   255     if ((uid_client == 0) || (uid_client == uid_cluster)) {
   260     crm_debug(
"New IPC client %s for PID %u with uid %d and gid %d",
   261               client->
id, client->
pid, uid_client, gid_client);
   265 static struct iovec *
   266 pcmk__new_ipc_event(
void)
   268     struct iovec *iov = calloc(2, 
sizeof(
struct iovec));
   283         free(event[0].iov_base);
   284         free(event[1].iov_base);
   290 free_event(gpointer 
data)
   311     if (client_connections) {
   313             crm_trace(
"Destroying %p/%p (%d remaining)",
   314                       c, c->
ipcs, g_hash_table_size(client_connections) - 1);
   315             g_hash_table_remove(client_connections, c->
ipcs);
   318             crm_trace(
"Destroying remote connection %p (%d remaining)",
   319                       c, g_hash_table_size(client_connections) - 1);
   320             g_hash_table_remove(client_connections, c->
id);
   362             && (qmax_ll > 0LL) && (qmax_ll <= UINT_MAX)) {
   363             client->
queue_max = (
unsigned int) qmax_ll;
   373     struct qb_ipcs_connection_stats stats;
   375     stats.client_pid = 0;
   376     qb_ipcs_connection_stats_get(c, &stats, 0);
   377     return stats.client_pid;
   396     char *uncompressed = NULL;
   405         *
id = ((
struct qb_ipc_response_header *)
data)->id;
   422         uncompressed = calloc(1, size_u);
   424         crm_trace(
"Decompressing message data %u bytes into %u bytes",
   427         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->
size_compressed, 1, 0);
   450 crm_ipcs_flush_events_cb(gpointer 
data)
   455     crm_ipcs_flush_events(c);
   470     guint 
delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
   488     unsigned int sent = 0;
   489     unsigned int queue_len = 0;
   505         struct iovec *
event = NULL;
   515         qb_rc = qb_ipcs_event_sendv(c->
ipcs, event, 2);
   523         header = 
event[0].iov_base;
   525             crm_trace(
"Event %d to %p[%d] (%lld compressed bytes) sent",
   526                       header->
qb.id, c->
ipcs, c->
pid, (
long long) qb_rc);
   528             crm_trace(
"Event %d to %p[%d] (%lld bytes) sent: %.120s",
   529                       header->
qb.id, c->
ipcs, c->
pid, (
long long) qb_rc,
   530                       (
char *) (event[1].iov_base));
   536     if (sent > 0 || queue_len) {
   537         crm_trace(
"Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
   538                   sent, queue_len, c->
ipcs, c->
pid,
   549             if ((c->
queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
   551                 crm_warn(
"Client with process ID %u has a backlog of %u messages "   554                 crm_err(
"Evicting client with process ID %u due to backlog of %u messages "   557                 qb_ipcs_disconnect(c->
ipcs);
   563         delay_next_flush(c, queue_len);
   587                       uint32_t max_send_size, 
struct iovec **result,
   590     static unsigned int biggest = 0;
   592     unsigned int total = 0;
   593     char *compressed = NULL;
   597     if ((message == NULL) || (result == NULL)) {
   602     if (header == NULL) {
   608     if (max_send_size == 0) {
   614     iov = pcmk__new_ipc_event();
   616     iov[0].iov_base = header;
   622     if (total < max_send_size) {
   623         iov[1].iov_base = buffer;
   627         unsigned int new_size = 0;
   630                            (
unsigned int) max_send_size, &compressed,
   637             iov[1].iov_base = compressed;
   647             crm_err(
"Could not compress %u-byte message into less than IPC "   648                     "limit of %u bytes; set PCMK_ipc_buffer to higher value "   649                     "(%u bytes suggested)",
   659     header->
qb.size = iov[0].iov_len + iov[1].iov_len;
   660     header->
qb.id = (int32_t)request;    
   665         *bytes = header->
qb.size;
   674     static uint32_t 
id = 1;
   691         header->
qb.id = 
id++;   
   698             struct iovec *iov_copy = pcmk__new_ipc_event();
   701             iov_copy[0].iov_len = iov[0].iov_len;
   702             iov_copy[0].iov_base = malloc(iov[0].iov_len);
   703             memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
   705             iov_copy[1].iov_len = iov[1].iov_len;
   706             iov_copy[1].iov_base = malloc(iov[1].iov_len);
   707             memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
   709             add_event(c, iov_copy);
   717         qb_rc = qb_ipcs_response_sendv(c->
ipcs, iov, 2);
   718         if (qb_rc < header->qb.size) {
   722             crm_notice(
"Response %d to pid %d failed: %s "   723                        CRM_XS " bytes=%u rc=%lld ipcs=%p",
   725                        header->
qb.size, (
long long) qb_rc, c->
ipcs);
   728             crm_trace(
"Response %d sent, %lld bytes to %p[%d]",
   729                       header->
qb.id, (
long long) qb_rc, c->
ipcs, c->
pid);
   738         rc = crm_ipcs_flush_events(c);
   740         crm_ipcs_flush_events(c);
   743     if ((
rc == EPIPE) || (
rc == ENOTCONN)) {
   753     struct iovec *iov = NULL;
   789                       uint32_t request, uint32_t 
flags, 
const char *tag,
   797         crm_trace(
"Ack'ing IPC message from client %s as <%s status=%d>",
   822                            qb_ipcs_service_t **ipcs_rw,
   823                            qb_ipcs_service_t **ipcs_shm,
   824                            struct qb_ipcs_service_handlers *ro_cb,
   825                            struct qb_ipcs_service_handlers *rw_cb)
   828                                        QB_IPC_NATIVE, ro_cb);
   831                                        QB_IPC_NATIVE, rw_cb);
   836     if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
   837         crm_err(
"Failed to create the CIB manager: exiting and inhibiting respawn");
   838         crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled");
   856                      qb_ipcs_service_t *ipcs_rw,
   857                      qb_ipcs_service_t *ipcs_shm)
   859     qb_ipcs_destroy(ipcs_ro);
   860     qb_ipcs_destroy(ipcs_rw);
   861     qb_ipcs_destroy(ipcs_shm);
   888                       struct qb_ipcs_service_handlers *cb)
   893         crm_err(
"Failed to create pacemaker-attrd server: exiting and inhibiting respawn");
   894         crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled.");
   909                        struct qb_ipcs_service_handlers *cb)
   915         crm_err(
"Failed to create fencer: exiting and inhibiting respawn.");
   916         crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled.");
   931                        struct qb_ipcs_service_handlers *cb)
   936         crm_err(
"Couldn't start pacemakerd IPC server");
   937         crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled.");
   963             || !strcmp(
name, 
"stonith-ng")
   964             || !strcmp(
name, 
"attrd")
 
#define CRM_CHECK(expr, failure_action)
 
#define PCMK__SERVER_BASED_RW
 
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages. 
 
void pcmk__drop_all_clients(qb_ipcs_service_t *service)
 
#define crm_notice(fmt, args...)
 
const char * pcmk__client_name(pcmk__client_t *c)
 
const char * bz2_strerror(int rc)
 
char * crm_generate_uuid(void)
 
void pcmk__client_cleanup(void)
 
int pcmk__ipc_prepare_iov(uint32_t request, xmlNode *message, uint32_t max_send_size, struct iovec **result, ssize_t *bytes)
 
void pcmk__free_client(pcmk__client_t *c)
 
_Noreturn crm_exit_t crm_exit(crm_exit_t rc)
 
pcmk__client_t * pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
 
bool crm_is_daemon_name(const char *name)
Check whether string represents a client name used by cluster daemons. 
 
qb_ipcs_service_t * mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks, enum qb_loop_priority prio)
Start server-side API end-point, hooked into the internal event loop. 
 
xmlNode * pcmk__client_data2xml(pcmk__client_t *c, void *data, uint32_t *id, uint32_t *flags)
 
int pcmk__client_pid(qb_ipcs_connection_t *c)
 
const char * crm_xml_add_int(xmlNode *node, const char *name, int value)
Create an XML attribute with specified name and integer value. 
 
const char * crm_xml_add(xmlNode *node, const char *name, const char *value)
Create an XML attribute with specified name and value. 
 
void pcmk_free_ipc_event(struct iovec *event)
Free an I/O vector created by pcmk__ipc_prepare_iov() 
 
enum crm_exit_e crm_exit_t
 
#define CRM_LOG_ASSERT(expr)
 
guint pcmk__ipc_client_count()
 
int pcmk_daemon_user(uid_t *uid, gid_t *gid)
Get user and group IDs of pacemaker daemon user. 
 
struct pcmk__ipc_header_s pcmk__ipc_header_t
 
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code. 
 
void pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
 
xmlNode * string2xml(const char *input)
 
void pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb)
 
pcmk__client_t * pcmk__new_unauth_client(void *key)
Allocate a new pcmk__client_t object and generate its ID. 
 
#define pcmk__set_client_flags(client, flags_to_set)
 
void pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb)
 
int pcmk__scan_ll(const char *text, long long *result, long long default_value)
 
#define crm_warn(fmt, args...)
 
pcmk__client_t * pcmk__find_client_by_id(const char *id)
 
#define PCMK__IPC_VERSION
 
#define crm_debug(fmt, args...)
 
void pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb)
 
External (OS/environmental) problem. 
 
#define crm_trace(fmt, args...)
 
#define pcmk_is_set(g, f)
Convenience alias for pcmk_all_flags_set(), to check single flag. 
 
G_GNUC_INTERNAL bool pcmk__valid_ipc_header(const pcmk__ipc_header_t *header)
 
int pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, xmlNode *message, uint32_t flags)
 
xmlNode * create_xml_node(xmlNode *parent, const char *name)
 
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
 
unsigned int crm_ipc_default_buffer_size(void)
Return pacemaker's default IPC buffer size. 
 
void free_xml(xmlNode *child)
 
#define PCMK__SERVER_BASED_RO
 
qb_ipcs_service_t * pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
 
int pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
 
#define CRM_SYSTEM_STONITHD
 
#define PCMK_IPC_DEFAULT_QUEUE_MAX
 
#define CRM_SYSTEM_TENGINE
 
bool pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
 
#define crm_perror(level, fmt, args...)
Send a system error message to both the log and stderr. 
 
#define pcmk__set_ipc_flags(ipc_flags, ipc_name, flags_to_set)
 
#define crm_err(fmt, args...)
 
void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro, qb_ipcs_service_t **ipcs_rw, qb_ipcs_service_t **ipcs_shm, struct qb_ipcs_service_handlers *ro_cb, struct qb_ipcs_service_handlers *rw_cb)
 
char * dump_xml_unformatted(xmlNode *msg)
 
#define pcmk__plural_s(i)
 
pcmk__client_t * pcmk__find_client(qb_ipcs_connection_t *c)
 
IPC interface to Pacemaker daemons. 
 
#define crm_log_xml_trace(xml, text)
 
#define PCMK__SERVER_BASED_SHM
 
struct pcmk__remote_s * remote
 
char * pcmk__uid2username(uid_t uid)
 
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
 
void pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro, qb_ipcs_service_t *ipcs_rw, qb_ipcs_service_t *ipcs_shm)
 
unsigned int queue_backlog
 
qb_ipcs_connection_t * ipcs
 
int pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c, uint32_t request, uint32_t flags, const char *tag, crm_exit_t status)