root/daemons/based/based_callbacks.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. cib_legacy_mode
  2. cib_ipc_accept
  3. cib_ipc_dispatch_rw
  4. cib_ipc_dispatch_ro
  5. cib_ipc_closed
  6. cib_ipc_destroy
  7. cib_common_callback_worker
  8. cib_common_callback
  9. cib_digester_cb
  10. process_ping_reply
  11. do_local_notify
  12. local_notify_destroy_callback
  13. check_local_notify
  14. queue_local_notify
  15. parse_local_options_v1
  16. parse_local_options_v2
  17. parse_local_options
  18. parse_peer_options_v1
  19. parse_peer_options_v2
  20. parse_peer_options
  21. forward_request
  22. send_peer_reply
  23. cib_process_request
  24. cib_process_command
  25. cib_peer_callback
  26. cib_force_exit
  27. disconnect_remote_client
  28. cib_shutdown
  29. initiate_exit
  30. terminate_cib

   1 /*
   2  * Copyright 2004-2020 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU General Public License version 2
   7  * or later (GPLv2+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <sys/param.h>
  13 #include <stdio.h>
  14 #include <sys/types.h>
  15 #include <unistd.h>
  16 
  17 #include <stdlib.h>
  18 #include <stdint.h>     // uint32_t, uint64_t, UINT64_C()
  19 #include <errno.h>
  20 #include <fcntl.h>
  21 #include <inttypes.h>  /* U64T ~ PRIu64 */
  22 
  23 #include <crm/crm.h>
  24 #include <crm/cib.h>
  25 #include <crm/msg_xml.h>
  26 #include <crm/cluster/internal.h>
  27 
  28 #include <crm/common/xml.h>
  29 #include <crm/common/remote_internal.h>
  30 
  31 #include <pacemaker-based.h>
  32 
  33 #define EXIT_ESCALATION_MS 10000
  34 
  35 static unsigned long cib_local_bcast_num = 0;
  36 
  37 typedef struct cib_local_notify_s {
  38     xmlNode *notify_src;
  39     char *client_id;
  40     gboolean from_peer;
  41     gboolean sync_reply;
  42 } cib_local_notify_t;
  43 
  44 int next_client_id = 0;
  45 
  46 gboolean legacy_mode = FALSE;
  47 
  48 qb_ipcs_service_t *ipcs_ro = NULL;
  49 qb_ipcs_service_t *ipcs_rw = NULL;
  50 qb_ipcs_service_t *ipcs_shm = NULL;
  51 
  52 void send_cib_replace(const xmlNode * sync_request, const char *host);
  53 static void cib_process_request(xmlNode *request, gboolean privileged,
  54                                 pcmk__client_t *cib_client);
  55 
  56 
  57 static int cib_process_command(xmlNode *request, xmlNode **reply,
  58                                xmlNode **cib_diff, gboolean privileged);
  59 
  60 gboolean cib_common_callback(qb_ipcs_connection_t * c, void *data, size_t size,
  61                              gboolean privileged);
  62 
  63 gboolean cib_legacy_mode(void)
     /* [previous][next][first][last][top][bottom][index][help] */
  64 {
  65     return legacy_mode;
  66 }
  67 
  68 
  69 static int32_t
  70 cib_ipc_accept(qb_ipcs_connection_t * c, uid_t uid, gid_t gid)
     /* [previous][next][first][last][top][bottom][index][help] */
  71 {
  72     if (cib_shutdown_flag) {
  73         crm_info("Ignoring new IPC client [%d] during shutdown",
  74                  pcmk__client_pid(c));
  75         return -EPERM;
  76     }
  77 
  78     if (pcmk__new_client(c, uid, gid) == NULL) {
  79         return -EIO;
  80     }
  81     return 0;
  82 }
  83 
  84 static int32_t
  85 cib_ipc_dispatch_rw(qb_ipcs_connection_t * c, void *data, size_t size)
     /* [previous][next][first][last][top][bottom][index][help] */
  86 {
  87     pcmk__client_t *client = pcmk__find_client(c);
  88 
  89     crm_trace("%p message from %s", c, client->id);
  90     return cib_common_callback(c, data, size, TRUE);
  91 }
  92 
  93 static int32_t
  94 cib_ipc_dispatch_ro(qb_ipcs_connection_t * c, void *data, size_t size)
     /* [previous][next][first][last][top][bottom][index][help] */
  95 {
  96     pcmk__client_t *client = pcmk__find_client(c);
  97 
  98     crm_trace("%p message from %s", c, client->id);
  99     return cib_common_callback(c, data, size, FALSE);
 100 }
 101 
 102 /* Error code means? */
 103 static int32_t
 104 cib_ipc_closed(qb_ipcs_connection_t * c)
     /* [previous][next][first][last][top][bottom][index][help] */
 105 {
 106     pcmk__client_t *client = pcmk__find_client(c);
 107 
 108     if (client == NULL) {
 109         return 0;
 110     }
 111     crm_trace("Connection %p", c);
 112     pcmk__free_client(client);
 113     return 0;
 114 }
 115 
 116 static void
 117 cib_ipc_destroy(qb_ipcs_connection_t * c)
     /* [previous][next][first][last][top][bottom][index][help] */
 118 {
 119     crm_trace("Connection %p", c);
 120     cib_ipc_closed(c);
 121     if (cib_shutdown_flag) {
 122         cib_shutdown(0);
 123     }
 124 }
 125 
 126 struct qb_ipcs_service_handlers ipc_ro_callbacks = {
 127     .connection_accept = cib_ipc_accept,
 128     .connection_created = NULL,
 129     .msg_process = cib_ipc_dispatch_ro,
 130     .connection_closed = cib_ipc_closed,
 131     .connection_destroyed = cib_ipc_destroy
 132 };
 133 
 134 struct qb_ipcs_service_handlers ipc_rw_callbacks = {
 135     .connection_accept = cib_ipc_accept,
 136     .connection_created = NULL,
 137     .msg_process = cib_ipc_dispatch_rw,
 138     .connection_closed = cib_ipc_closed,
 139     .connection_destroyed = cib_ipc_destroy
 140 };
 141 
 142 void
 143 cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request,
     /* [previous][next][first][last][top][bottom][index][help] */
 144                            pcmk__client_t *cib_client, gboolean privileged)
 145 {
 146     const char *op = crm_element_value(op_request, F_CIB_OPERATION);
 147 
 148     if (pcmk__str_eq(op, CRM_OP_REGISTER, pcmk__str_none)) {
 149         if (flags & crm_ipc_client_response) {
 150             xmlNode *ack = create_xml_node(NULL, __func__);
 151 
 152             crm_xml_add(ack, F_CIB_OPERATION, CRM_OP_REGISTER);
 153             crm_xml_add(ack, F_CIB_CLIENTID, cib_client->id);
 154             pcmk__ipc_send_xml(cib_client, id, ack, flags);
 155             cib_client->request_id = 0;
 156             free_xml(ack);
 157         }
 158         return;
 159 
 160     } else if (pcmk__str_eq(op, T_CIB_NOTIFY, pcmk__str_none)) {
 161         /* Update the notify filters for this client */
 162         int on_off = 0;
 163         crm_exit_t status = CRM_EX_OK;
 164         uint64_t bit = UINT64_C(0);
 165         const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE);
 166 
 167         crm_element_value_int(op_request, F_CIB_NOTIFY_ACTIVATE, &on_off);
 168 
 169         crm_debug("Setting %s callbacks for %s (%s): %s",
 170                   type, cib_client->name, cib_client->id, on_off ? "on" : "off");
 171 
 172         if (pcmk__str_eq(type, T_CIB_POST_NOTIFY, pcmk__str_casei)) {
 173             bit = cib_notify_post;
 174 
 175         } else if (pcmk__str_eq(type, T_CIB_PRE_NOTIFY, pcmk__str_casei)) {
 176             bit = cib_notify_pre;
 177 
 178         } else if (pcmk__str_eq(type, T_CIB_UPDATE_CONFIRM, pcmk__str_casei)) {
 179             bit = cib_notify_confirm;
 180 
 181         } else if (pcmk__str_eq(type, T_CIB_DIFF_NOTIFY, pcmk__str_casei)) {
 182             bit = cib_notify_diff;
 183 
 184         } else if (pcmk__str_eq(type, T_CIB_REPLACE_NOTIFY, pcmk__str_casei)) {
 185             bit = cib_notify_replace;
 186 
 187         } else {
 188             status = CRM_EX_INVALID_PARAM;
 189         }
 190 
 191         if (bit != 0) {
 192             if (on_off) {
 193                 pcmk__set_client_flags(cib_client, bit);
 194             } else {
 195                 pcmk__clear_client_flags(cib_client, bit);
 196             }
 197         }
 198 
 199         pcmk__ipc_send_ack(cib_client, id, flags, "ack", status);
 200         return;
 201     }
 202 
 203     cib_process_request(op_request, privileged, cib_client);
 204 }
 205 
 206 int32_t
 207 cib_common_callback(qb_ipcs_connection_t * c, void *data, size_t size, gboolean privileged)
     /* [previous][next][first][last][top][bottom][index][help] */
 208 {
 209     uint32_t id = 0;
 210     uint32_t flags = 0;
 211     int call_options = 0;
 212     pcmk__client_t *cib_client = pcmk__find_client(c);
 213     xmlNode *op_request = pcmk__client_data2xml(cib_client, data, &id, &flags);
 214 
 215     if (op_request) {
 216         crm_element_value_int(op_request, F_CIB_CALLOPTS, &call_options);
 217     }
 218 
 219     if (op_request == NULL) {
 220         crm_trace("Invalid message from %p", c);
 221         pcmk__ipc_send_ack(cib_client, id, flags, "nack", CRM_EX_PROTOCOL);
 222         return 0;
 223 
 224     } else if(cib_client == NULL) {
 225         crm_trace("Invalid client %p", c);
 226         return 0;
 227     }
 228 
 229     if (pcmk_is_set(call_options, cib_sync_call)) {
 230         CRM_LOG_ASSERT(flags & crm_ipc_client_response);
 231         CRM_LOG_ASSERT(cib_client->request_id == 0);    /* This means the client has two synchronous events in-flight */
 232         cib_client->request_id = id;    /* Reply only to the last one */
 233     }
 234 
 235     if (cib_client->name == NULL) {
 236         const char *value = crm_element_value(op_request, F_CIB_CLIENTNAME);
 237 
 238         if (value == NULL) {
 239             cib_client->name = crm_itoa(cib_client->pid);
 240         } else {
 241             cib_client->name = strdup(value);
 242             if (crm_is_daemon_name(value)) {
 243                 pcmk__set_client_flags(cib_client, cib_is_daemon);
 244             }
 245         }
 246     }
 247 
 248     /* Allow cluster daemons more leeway before being evicted */
 249     if (pcmk_is_set(cib_client->flags, cib_is_daemon)) {
 250         const char *qmax = cib_config_lookup("cluster-ipc-limit");
 251 
 252         if (pcmk__set_client_queue_max(cib_client, qmax)) {
 253             crm_trace("IPC threshold for %s[%u] is now %u",
 254                       cib_client->name, cib_client->pid, cib_client->queue_max);
 255         }
 256     }
 257 
 258     crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id);
 259     crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name);
 260 
 261 #if ENABLE_ACL
 262     CRM_LOG_ASSERT(cib_client->user != NULL);
 263     pcmk__update_acl_user(op_request, F_CIB_USER, cib_client->user);
 264 #endif
 265 
 266     cib_common_callback_worker(id, flags, op_request, cib_client, privileged);
 267     free_xml(op_request);
 268 
 269     return 0;
 270 }
 271 
 272 static uint64_t ping_seq = 0;
 273 static char *ping_digest = NULL;
 274 static bool ping_modified_since = FALSE;
 275 int sync_our_cib(xmlNode * request, gboolean all);
 276 
 277 static gboolean
 278 cib_digester_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 279 {
 280     if (cib_is_master) {
 281         char buffer[32];
 282         xmlNode *ping = create_xml_node(NULL, "ping");
 283 
 284         ping_seq++;
 285         free(ping_digest);
 286         ping_digest = NULL;
 287         ping_modified_since = FALSE;
 288         snprintf(buffer, 32, "%" U64T, ping_seq);
 289         crm_trace("Requesting peer digests (%s)", buffer);
 290 
 291         crm_xml_add(ping, F_TYPE, "cib");
 292         crm_xml_add(ping, F_CIB_OPERATION, CRM_OP_PING);
 293         crm_xml_add(ping, F_CIB_PING_ID, buffer);
 294 
 295         crm_xml_add(ping, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET);
 296         send_cluster_message(NULL, crm_msg_cib, ping, TRUE);
 297 
 298         free_xml(ping);
 299     }
 300     return FALSE;
 301 }
 302 
 303 static void
 304 process_ping_reply(xmlNode *reply) 
     /* [previous][next][first][last][top][bottom][index][help] */
 305 {
 306     uint64_t seq = 0;
 307     const char *host = crm_element_value(reply, F_ORIG);
 308 
 309     xmlNode *pong = get_message_xml(reply, F_CIB_CALLDATA);
 310     const char *seq_s = crm_element_value(pong, F_CIB_PING_ID);
 311     const char *digest = crm_element_value(pong, XML_ATTR_DIGEST);
 312 
 313     if (seq_s) {
 314         seq = (uint64_t) crm_parse_ll(seq_s, NULL);
 315     }
 316 
 317     if(digest == NULL) {
 318         crm_trace("Ignoring ping reply %s from %s with no digest", seq_s, host);
 319 
 320     } else if(seq != ping_seq) {
 321         crm_trace("Ignoring out of sequence ping reply %s from %s", seq_s, host);
 322 
 323     } else if(ping_modified_since) {
 324         crm_trace("Ignoring ping reply %s from %s: cib updated since", seq_s, host);
 325 
 326     } else {
 327         const char *version = crm_element_value(pong, XML_ATTR_CRM_VERSION);
 328 
 329         if(ping_digest == NULL) {
 330             crm_trace("Calculating new digest");
 331             ping_digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, version);
 332         }
 333 
 334         crm_trace("Processing ping reply %s from %s (%s)", seq_s, host, digest);
 335         if (!pcmk__str_eq(ping_digest, digest, pcmk__str_casei)) {
 336             xmlNode *remote_cib = get_message_xml(pong, F_CIB_CALLDATA);
 337 
 338             crm_notice("Local CIB %s.%s.%s.%s differs from %s: %s.%s.%s.%s %p",
 339                        crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN),
 340                        crm_element_value(the_cib, XML_ATTR_GENERATION),
 341                        crm_element_value(the_cib, XML_ATTR_NUMUPDATES),
 342                        ping_digest, host,
 343                        remote_cib?crm_element_value(remote_cib, XML_ATTR_GENERATION_ADMIN):"_",
 344                        remote_cib?crm_element_value(remote_cib, XML_ATTR_GENERATION):"_",
 345                        remote_cib?crm_element_value(remote_cib, XML_ATTR_NUMUPDATES):"_",
 346                        digest, remote_cib);
 347 
 348             if(remote_cib && remote_cib->children) {
 349                 /* Additional debug */
 350                 xml_calculate_changes(the_cib, remote_cib);
 351                 xml_log_changes(LOG_INFO, __func__, remote_cib);
 352                 crm_trace("End of differences");
 353             }
 354 
 355             free_xml(remote_cib);
 356             sync_our_cib(reply, FALSE);
 357         }
 358     }
 359 }
 360 
 361 static void
 362 do_local_notify(xmlNode * notify_src, const char *client_id,
     /* [previous][next][first][last][top][bottom][index][help] */
 363                 gboolean sync_reply, gboolean from_peer)
 364 {
 365     int rid = 0;
 366     int call_id = 0;
 367     pcmk__client_t *client_obj = NULL;
 368 
 369     CRM_ASSERT(notify_src && client_id);
 370 
 371     crm_element_value_int(notify_src, F_CIB_CALLID, &call_id);
 372 
 373     client_obj = pcmk__find_client_by_id(client_id);
 374     if (client_obj == NULL) {
 375         crm_debug("Could not send response %d: client %s not found",
 376                   call_id, client_id);
 377         return;
 378     }
 379 
 380     if (sync_reply) {
 381         if (client_obj->ipcs) {
 382             CRM_LOG_ASSERT(client_obj->request_id);
 383 
 384             rid = client_obj->request_id;
 385             client_obj->request_id = 0;
 386 
 387             crm_trace("Sending response %d to %s %s",
 388                       rid, client_obj->name,
 389                       from_peer ? "(originator of delegated request)" : "");
 390         } else {
 391             crm_trace("Sending response [call %d] to %s %s",
 392                       call_id, client_obj->name, from_peer ? "(originator of delegated request)" : "");
 393         }
 394 
 395     } else {
 396         crm_trace("Sending event %d to %s %s",
 397                   call_id, client_obj->name, from_peer ? "(originator of delegated request)" : "");
 398     }
 399 
 400     switch (PCMK__CLIENT_TYPE(client_obj)) {
 401         case pcmk__client_ipc:
 402             {
 403                 int rc = pcmk__ipc_send_xml(client_obj, rid, notify_src,
 404                                             (sync_reply? crm_ipc_flags_none
 405                                              : crm_ipc_server_event));
 406 
 407                 if (rc != pcmk_rc_ok) {
 408                     crm_warn("%s reply to %s failed: %s " CRM_XS " rc=%d",
 409                              (sync_reply? "Synchronous" : "Asynchronous"),
 410                              client_obj->name, pcmk_rc_str(rc), rc);
 411                 }
 412             }
 413             break;
 414 #ifdef HAVE_GNUTLS_GNUTLS_H
 415         case pcmk__client_tls:
 416 #endif
 417         case pcmk__client_tcp:
 418             pcmk__remote_send_xml(client_obj->remote, notify_src);
 419             break;
 420         default:
 421             crm_err("Unknown transport for %s " CRM_XS " flags=0x%llx",
 422                     pcmk__client_name(client_obj),
 423                     (unsigned long long) client_obj->flags);
 424     }
 425 }
 426 
 427 static void
 428 local_notify_destroy_callback(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 429 {
 430     cib_local_notify_t *notify = data;
 431 
 432     free_xml(notify->notify_src);
 433     free(notify->client_id);
 434     free(notify);
 435 }
 436 
 437 static void
 438 check_local_notify(int bcast_id)
     /* [previous][next][first][last][top][bottom][index][help] */
 439 {
 440     cib_local_notify_t *notify = NULL;
 441 
 442     if (!local_notify_queue) {
 443         return;
 444     }
 445 
 446     notify = g_hash_table_lookup(local_notify_queue, GINT_TO_POINTER(bcast_id));
 447 
 448     if (notify) {
 449         do_local_notify(notify->notify_src, notify->client_id, notify->sync_reply,
 450                         notify->from_peer);
 451         g_hash_table_remove(local_notify_queue, GINT_TO_POINTER(bcast_id));
 452     }
 453 }
 454 
 455 static void
 456 queue_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply,
     /* [previous][next][first][last][top][bottom][index][help] */
 457                    gboolean from_peer)
 458 {
 459     cib_local_notify_t *notify = calloc(1, sizeof(cib_local_notify_t));
 460 
 461     notify->notify_src = notify_src;
 462     notify->client_id = strdup(client_id);
 463     notify->sync_reply = sync_reply;
 464     notify->from_peer = from_peer;
 465 
 466     if (!local_notify_queue) {
 467         local_notify_queue = g_hash_table_new_full(g_direct_hash,
 468                                                    g_direct_equal, NULL,
 469                                                    local_notify_destroy_callback);
 470     }
 471 
 472     g_hash_table_insert(local_notify_queue, GINT_TO_POINTER(cib_local_bcast_num), notify);
 473     // cppcheck doesn't know notify will get freed when hash table is destroyed
 474     // cppcheck-suppress memleak
 475 }
 476 
 477 static void
 478 parse_local_options_v1(pcmk__client_t *cib_client, int call_type,
     /* [previous][next][first][last][top][bottom][index][help] */
 479                        int call_options, const char *host, const char *op,
 480                        gboolean *local_notify, gboolean *needs_reply,
 481                        gboolean *process, gboolean *needs_forward)
 482 {
 483     if (cib_op_modifies(call_type)
 484         && !(call_options & cib_inhibit_bcast)) {
 485         /* we need to send an update anyway */
 486         *needs_reply = TRUE;
 487     } else {
 488         *needs_reply = FALSE;
 489     }
 490 
 491     if (host == NULL && (call_options & cib_scope_local)) {
 492         crm_trace("Processing locally scoped %s op from %s", op, cib_client->name);
 493         *local_notify = TRUE;
 494 
 495     } else if (host == NULL && cib_is_master) {
 496         crm_trace("Processing master %s op locally from %s", op, cib_client->name);
 497         *local_notify = TRUE;
 498 
 499     } else if (pcmk__str_eq(host, cib_our_uname, pcmk__str_casei)) {
 500         crm_trace("Processing locally addressed %s op from %s", op, cib_client->name);
 501         *local_notify = TRUE;
 502 
 503     } else if (stand_alone) {
 504         *needs_forward = FALSE;
 505         *local_notify = TRUE;
 506         *process = TRUE;
 507 
 508     } else {
 509         crm_trace("%s op from %s needs to be forwarded to %s",
 510                   op, cib_client->name, host ? host : "the master instance");
 511         *needs_forward = TRUE;
 512         *process = FALSE;
 513     }
 514 }
 515 
 516 static void
 517 parse_local_options_v2(pcmk__client_t *cib_client, int call_type,
     /* [previous][next][first][last][top][bottom][index][help] */
 518                        int call_options, const char *host, const char *op,
 519                        gboolean *local_notify, gboolean *needs_reply,
 520                        gboolean *process, gboolean *needs_forward)
 521 {
 522     if (cib_op_modifies(call_type)) {
 523         if (pcmk__strcase_any_of(op, CIB_OP_MASTER, CIB_OP_SLAVE, NULL)) {
 524             /* Always handle these locally */
 525             *process = TRUE;
 526             *needs_reply = FALSE;
 527             *local_notify = TRUE;
 528             *needs_forward = FALSE;
 529             return;
 530 
 531         } else {
 532             /* Redirect all other updates via CPG */
 533             *needs_reply = TRUE;
 534             *needs_forward = TRUE;
 535             *process = FALSE;
 536             crm_trace("%s op from %s needs to be forwarded to %s",
 537                       op, cib_client->name, host ? host : "the master instance");
 538             return;
 539         }
 540     }
 541 
 542 
 543     *process = TRUE;
 544     *needs_reply = FALSE;
 545     *local_notify = TRUE;
 546     *needs_forward = FALSE;
 547 
 548     if (stand_alone) {
 549         crm_trace("Processing %s op from %s (stand-alone)", op, cib_client->name);
 550 
 551     } else if (host == NULL) {
 552         crm_trace("Processing unaddressed %s op from %s", op, cib_client->name);
 553 
 554     } else if (pcmk__str_eq(host, cib_our_uname, pcmk__str_casei)) {
 555         crm_trace("Processing locally addressed %s op from %s", op, cib_client->name);
 556 
 557     } else {
 558         crm_trace("%s op from %s needs to be forwarded to %s", op, cib_client->name, host);
 559         *needs_forward = TRUE;
 560         *process = FALSE;
 561     }
 562 }
 563 
 564 static void
 565 parse_local_options(pcmk__client_t *cib_client, int call_type,
     /* [previous][next][first][last][top][bottom][index][help] */
 566                     int call_options, const char *host, const char *op,
 567                     gboolean *local_notify, gboolean *needs_reply,
 568                     gboolean *process, gboolean *needs_forward)
 569 {
 570     if(cib_legacy_mode()) {
 571         parse_local_options_v1(cib_client, call_type, call_options, host,
 572                                op, local_notify, needs_reply, process, needs_forward);
 573     } else {
 574         parse_local_options_v2(cib_client, call_type, call_options, host,
 575                                op, local_notify, needs_reply, process, needs_forward);
 576     }
 577 }
 578 
 579 static gboolean
 580 parse_peer_options_v1(int call_type, xmlNode * request,
     /* [previous][next][first][last][top][bottom][index][help] */
 581                    gboolean * local_notify, gboolean * needs_reply, gboolean * process,
 582                    gboolean * needs_forward)
 583 {
 584     const char *op = NULL;
 585     const char *host = NULL;
 586     const char *delegated = NULL;
 587     const char *originator = crm_element_value(request, F_ORIG);
 588     const char *reply_to = crm_element_value(request, F_CIB_ISREPLY);
 589     const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE);
 590 
 591     gboolean is_reply = pcmk__str_eq(reply_to, cib_our_uname, pcmk__str_casei);
 592 
 593     if (crm_is_true(update)) {
 594         *needs_reply = FALSE;
 595         if (is_reply) {
 596             *local_notify = TRUE;
 597             crm_trace("Processing global/peer update from %s"
 598                       " that originated from us", originator);
 599         } else {
 600             crm_trace("Processing global/peer update from %s", originator);
 601         }
 602         return TRUE;
 603     }
 604 
 605     op = crm_element_value(request, F_CIB_OPERATION);
 606     crm_trace("Processing %s request sent by %s", op, originator);
 607     if (pcmk__str_eq(op, "cib_shutdown_req", pcmk__str_casei)) {
 608         /* Always process these */
 609         *local_notify = FALSE;
 610         if (reply_to == NULL || is_reply) {
 611             *process = TRUE;
 612         }
 613         if (is_reply) {
 614             *needs_reply = FALSE;
 615         }
 616         return *process;
 617     }
 618 
 619     if (is_reply && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
 620         process_ping_reply(request);
 621         return FALSE;
 622     }
 623 
 624     if (is_reply) {
 625         crm_trace("Forward reply sent from %s to local clients", originator);
 626         *process = FALSE;
 627         *needs_reply = FALSE;
 628         *local_notify = TRUE;
 629         return TRUE;
 630     }
 631 
 632     host = crm_element_value(request, F_CIB_HOST);
 633     if (host != NULL && pcmk__str_eq(host, cib_our_uname, pcmk__str_casei)) {
 634         crm_trace("Processing %s request sent to us from %s", op, originator);
 635         return TRUE;
 636 
 637     } else if(is_reply == FALSE && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
 638         crm_trace("Processing %s request sent to %s by %s", op, host?host:"everyone", originator);
 639         *needs_reply = TRUE;
 640         return TRUE;
 641 
 642     } else if (host == NULL && cib_is_master == TRUE) {
 643         crm_trace("Processing %s request sent to master instance from %s", op, originator);
 644         return TRUE;
 645     }
 646 
 647     delegated = crm_element_value(request, F_CIB_DELEGATED);
 648     if (delegated != NULL) {
 649         crm_trace("Ignoring msg for master instance");
 650 
 651     } else if (host != NULL) {
 652         /* this is for a specific instance and we're not it */
 653         crm_trace("Ignoring msg for instance on %s", crm_str(host));
 654 
 655     } else if (reply_to == NULL && cib_is_master == FALSE) {
 656         /* this is for the master instance and we're not it */
 657         crm_trace("Ignoring reply to %s", crm_str(reply_to));
 658 
 659     } else if (pcmk__str_eq(op, "cib_shutdown_req", pcmk__str_casei)) {
 660         if (reply_to != NULL) {
 661             crm_debug("Processing %s from %s", op, originator);
 662             *needs_reply = FALSE;
 663 
 664         } else {
 665             crm_debug("Processing %s reply from %s", op, originator);
 666         }
 667         return TRUE;
 668 
 669     } else {
 670         crm_err("Nothing for us to do?");
 671         crm_log_xml_err(request, "Peer[inbound]");
 672     }
 673 
 674     return FALSE;
 675 }
 676 
 677 static gboolean
 678 parse_peer_options_v2(int call_type, xmlNode * request,
     /* [previous][next][first][last][top][bottom][index][help] */
 679                    gboolean * local_notify, gboolean * needs_reply, gboolean * process,
 680                    gboolean * needs_forward)
 681 {
 682     const char *host = NULL;
 683     const char *delegated = crm_element_value(request, F_CIB_DELEGATED);
 684     const char *op = crm_element_value(request, F_CIB_OPERATION);
 685     const char *originator = crm_element_value(request, F_ORIG);
 686     const char *reply_to = crm_element_value(request, F_CIB_ISREPLY);
 687     const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE);
 688 
 689     gboolean is_reply = pcmk__str_eq(reply_to, cib_our_uname, pcmk__str_casei);
 690 
 691     if(pcmk__str_eq(op, CIB_OP_REPLACE, pcmk__str_casei)) {
 692         /* sync_our_cib() sets F_CIB_ISREPLY */
 693         if (reply_to) {
 694             delegated = reply_to;
 695         }
 696         goto skip_is_reply;
 697 
 698     } else if(pcmk__str_eq(op, CIB_OP_SYNC, pcmk__str_casei)) {
 699 
 700     } else if (is_reply && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
 701         process_ping_reply(request);
 702         return FALSE;
 703 
 704     } else if (pcmk__str_eq(op, CIB_OP_UPGRADE, pcmk__str_casei)) {
 705         /* Only the DC (node with the oldest software) should process
 706          * this operation if F_CIB_SCHEMA_MAX is unset
 707          *
 708          * If the DC is happy it will then send out another
 709          * CIB_OP_UPGRADE which will tell all nodes to do the actual
 710          * upgrade.
 711          *
 712          * Except this time F_CIB_SCHEMA_MAX will be set which puts a
 713          * limit on how far newer nodes will go
 714          */
 715         const char *max = crm_element_value(request, F_CIB_SCHEMA_MAX);
 716         const char *upgrade_rc = crm_element_value(request, F_CIB_UPGRADE_RC);
 717 
 718         crm_trace("Parsing %s operation%s for %s with max=%s and upgrade_rc=%s",
 719                   op, (is_reply? " reply" : ""),
 720                   (cib_is_master? "master" : "slave"),
 721                   (max? max : "none"), (upgrade_rc? upgrade_rc : "none"));
 722 
 723         if (upgrade_rc != NULL) {
 724             // Our upgrade request was rejected by DC, notify clients of result
 725             crm_xml_add(request, F_CIB_RC, upgrade_rc);
 726 
 727         } else if ((max == NULL) && cib_is_master) {
 728             /* We are the DC, check if this upgrade is allowed */
 729             goto skip_is_reply;
 730 
 731         } else if(max) {
 732             /* Ok, go ahead and upgrade to 'max' */
 733             goto skip_is_reply;
 734 
 735         } else {
 736             // Ignore broadcast client requests when we're not DC
 737             return FALSE;
 738         }
 739 
 740     } else if (crm_is_true(update)) {
 741         crm_info("Detected legacy %s global update from %s", op, originator);
 742         send_sync_request(NULL);
 743         legacy_mode = TRUE;
 744         return FALSE;
 745 
 746     } else if (is_reply && cib_op_modifies(call_type)) {
 747         crm_trace("Ignoring legacy %s reply sent from %s to local clients", op, originator);
 748         return FALSE;
 749 
 750     } else if (pcmk__str_eq(op, "cib_shutdown_req", pcmk__str_casei)) {
 751         /* Legacy handling */
 752         crm_debug("Legacy handling of %s message from %s", op, originator);
 753         *local_notify = FALSE;
 754         if (reply_to == NULL) {
 755             *process = TRUE;
 756         }
 757         return *process;
 758     }
 759 
 760     if(is_reply) {
 761         crm_trace("Handling %s reply sent from %s to local clients", op, originator);
 762         *process = FALSE;
 763         *needs_reply = FALSE;
 764         *local_notify = TRUE;
 765         return TRUE;
 766     }
 767 
 768   skip_is_reply:
 769     *process = TRUE;
 770     *needs_reply = FALSE;
 771 
 772     if(pcmk__str_eq(delegated, cib_our_uname, pcmk__str_casei)) {
 773         *local_notify = TRUE;
 774     } else {
 775         *local_notify = FALSE;
 776     }
 777 
 778     host = crm_element_value(request, F_CIB_HOST);
 779     if (host != NULL && pcmk__str_eq(host, cib_our_uname, pcmk__str_casei)) {
 780         crm_trace("Processing %s request sent to us from %s", op, originator);
 781         *needs_reply = TRUE;
 782         return TRUE;
 783 
 784     } else if (host != NULL) {
 785         /* this is for a specific instance and we're not it */
 786         crm_trace("Ignoring %s operation for instance on %s", op, crm_str(host));
 787         return FALSE;
 788 
 789     } else if(is_reply == FALSE && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
 790         *needs_reply = TRUE;
 791     }
 792 
 793     crm_trace("Processing %s request sent to everyone by %s/%s on %s %s", op,
 794               crm_element_value(request, F_CIB_CLIENTNAME),
 795               crm_element_value(request, F_CIB_CALLID),
 796               originator, (*local_notify)?"(notify)":"");
 797     return TRUE;
 798 }
 799 
 800 static gboolean
 801 parse_peer_options(int call_type, xmlNode * request,
     /* [previous][next][first][last][top][bottom][index][help] */
 802                    gboolean * local_notify, gboolean * needs_reply, gboolean * process,
 803                    gboolean * needs_forward)
 804 {
 805     /* TODO: What happens when an update comes in after node A
 806      * requests the CIB from node B, but before it gets the reply (and
 807      * sends out the replace operation)
 808      */
 809     if(cib_legacy_mode()) {
 810         return parse_peer_options_v1(
 811             call_type, request, local_notify, needs_reply, process, needs_forward);
 812     } else {
 813         return parse_peer_options_v2(
 814             call_type, request, local_notify, needs_reply, process, needs_forward);
 815     }
 816 }
 817 
 818 static void
 819 forward_request(xmlNode * request, pcmk__client_t *cib_client, int call_options)
     /* [previous][next][first][last][top][bottom][index][help] */
 820 {
 821     const char *op = crm_element_value(request, F_CIB_OPERATION);
 822     const char *host = crm_element_value(request, F_CIB_HOST);
 823 
 824     crm_xml_add(request, F_CIB_DELEGATED, cib_our_uname);
 825 
 826     if (host != NULL) {
 827         crm_trace("Forwarding %s op to %s", op, host);
 828         send_cluster_message(crm_get_peer(0, host), crm_msg_cib, request, FALSE);
 829 
 830     } else {
 831         crm_trace("Forwarding %s op to master instance", op);
 832         send_cluster_message(NULL, crm_msg_cib, request, FALSE);
 833     }
 834 
 835     /* Return the request to its original state */
 836     xml_remove_prop(request, F_CIB_DELEGATED);
 837 
 838     if (call_options & cib_discard_reply) {
 839         crm_trace("Client not interested in reply");
 840     }
 841 }
 842 
 843 static gboolean
 844 send_peer_reply(xmlNode * msg, xmlNode * result_diff, const char *originator, gboolean broadcast)
     /* [previous][next][first][last][top][bottom][index][help] */
 845 {
 846     CRM_ASSERT(msg != NULL);
 847 
 848     if (broadcast) {
 849         /* this (successful) call modified the CIB _and_ the
 850          * change needs to be broadcast...
 851          *   send via HA to other nodes
 852          */
 853         int diff_add_updates = 0;
 854         int diff_add_epoch = 0;
 855         int diff_add_admin_epoch = 0;
 856 
 857         int diff_del_updates = 0;
 858         int diff_del_epoch = 0;
 859         int diff_del_admin_epoch = 0;
 860 
 861         const char *digest = NULL;
 862         int format = 1;
 863 
 864         CRM_LOG_ASSERT(result_diff != NULL);
 865         digest = crm_element_value(result_diff, XML_ATTR_DIGEST);
 866         crm_element_value_int(result_diff, "format", &format);
 867 
 868         cib_diff_version_details(result_diff,
 869                                  &diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates,
 870                                  &diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates);
 871 
 872         crm_trace("Sending update diff %d.%d.%d -> %d.%d.%d %s",
 873                   diff_del_admin_epoch, diff_del_epoch, diff_del_updates,
 874                   diff_add_admin_epoch, diff_add_epoch, diff_add_updates, digest);
 875 
 876         crm_xml_add(msg, F_CIB_ISREPLY, originator);
 877         crm_xml_add(msg, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE);
 878         crm_xml_add(msg, F_CIB_OPERATION, CIB_OP_APPLY_DIFF);
 879         crm_xml_add(msg, F_CIB_USER, CRM_DAEMON_USER);
 880 
 881         if (format == 1) {
 882             CRM_ASSERT(digest != NULL);
 883         }
 884 
 885         add_message_xml(msg, F_CIB_UPDATE_DIFF, result_diff);
 886         crm_log_xml_explicit(msg, "copy");
 887         return send_cluster_message(NULL, crm_msg_cib, msg, TRUE);
 888 
 889     } else if (originator != NULL) {
 890         /* send reply via HA to originating node */
 891         crm_trace("Sending request result to %s only", originator);
 892         crm_xml_add(msg, F_CIB_ISREPLY, originator);
 893         return send_cluster_message(crm_get_peer(0, originator), crm_msg_cib, msg, FALSE);
 894     }
 895 
 896     return FALSE;
 897 }
 898 
 899 /*!
 900  * \internal
 901  * \brief Handle an IPC or CPG message containing a request
 902  *
 903  * \param[in] request            Request XML
 904  * \param[in] privileged         Whether privileged commands may be run
 905  *                               (see cib_server_ops[] definition)
 906  * \param[in] cib_client         IPC client that sent request (or NULL if CPG)
 907  */
 908 static void
 909 cib_process_request(xmlNode *request, gboolean privileged,
     /* [previous][next][first][last][top][bottom][index][help] */
 910                     pcmk__client_t *cib_client)
 911 {
 912     int call_type = 0;
 913     int call_options = 0;
 914 
 915     gboolean process = TRUE;        // Whether to process request locally now
 916     gboolean is_update = TRUE;      // Whether request would modify CIB
 917     gboolean needs_reply = TRUE;    // Whether to build a reply
 918     gboolean local_notify = FALSE;  // Whether to notify (local) requester
 919     gboolean needs_forward = FALSE; // Whether to forward request somewhere else
 920     gboolean global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE));
 921 
 922     xmlNode *op_reply = NULL;
 923     xmlNode *result_diff = NULL;
 924 
 925     int rc = pcmk_ok;
 926     const char *op = crm_element_value(request, F_CIB_OPERATION);
 927     const char *originator = crm_element_value(request, F_ORIG);
 928     const char *host = crm_element_value(request, F_CIB_HOST);
 929     const char *target = NULL;
 930     const char *call_id = crm_element_value(request, F_CIB_CALLID);
 931     const char *client_id = crm_element_value(request, F_CIB_CLIENTID);
 932     const char *client_name = crm_element_value(request, F_CIB_CLIENTNAME);
 933     const char *reply_to = crm_element_value(request, F_CIB_ISREPLY);
 934 
 935     crm_element_value_int(request, F_CIB_CALLOPTS, &call_options);
 936 
 937     if ((host != NULL) && (*host == '\0')) {
 938         host = NULL;
 939     }
 940 
 941     if (host) {
 942         target = host;
 943 
 944     } else if (call_options & cib_scope_local) {
 945         target = "local host";
 946 
 947     } else {
 948         target = "master";
 949     }
 950 
 951     if (cib_client == NULL) {
 952         crm_trace("Processing peer %s operation from %s/%s on %s intended for %s (reply=%s)",
 953                   op, client_name, call_id, originator, target, reply_to);
 954     } else {
 955         crm_xml_add(request, F_ORIG, cib_our_uname);
 956         crm_trace("Processing local %s operation from %s/%s intended for %s", op, client_name, call_id, target);
 957     }
 958 
 959     rc = cib_get_operation_id(op, &call_type);
 960     if (rc != pcmk_ok) {
 961         /* TODO: construct error reply? */
 962         crm_err("Pre-processing of command failed: %s", pcmk_strerror(rc));
 963         return;
 964     }
 965 
 966     if (cib_client != NULL) {
 967         parse_local_options(cib_client, call_type, call_options, host, op,
 968                             &local_notify, &needs_reply, &process, &needs_forward);
 969 
 970     } else if (parse_peer_options(call_type, request, &local_notify,
 971                                   &needs_reply, &process, &needs_forward) == FALSE) {
 972         return;
 973     }
 974 
 975     is_update = cib_op_modifies(call_type);
 976 
 977     if (call_options & cib_discard_reply) {
 978         /* If the request will modify the CIB, and we are in legacy mode, we
 979          * need to build a reply so we can broadcast a diff, even if the
 980          * requester doesn't want one.
 981          */
 982         needs_reply = is_update && cib_legacy_mode();
 983         local_notify = FALSE;
 984     }
 985 
 986     if (needs_forward) {
 987         const char *section = crm_element_value(request, F_CIB_SECTION);
 988         int log_level = LOG_INFO;
 989 
 990         if (pcmk__str_eq(op, CRM_OP_NOOP, pcmk__str_casei)) {
 991             log_level = LOG_DEBUG;
 992         }
 993 
 994         do_crm_log(log_level,
 995                    "Forwarding %s operation for section %s to %s (origin=%s/%s/%s)",
 996                    op,
 997                    section ? section : "'all'",
 998                    host ? host : cib_legacy_mode() ? "master" : "all",
 999                    originator ? originator : "local",
1000                    client_name, call_id);
1001 
1002         forward_request(request, cib_client, call_options);
1003         return;
1004     }
1005 
1006     if (cib_status != pcmk_ok) {
1007         const char *call = crm_element_value(request, F_CIB_CALLID);
1008 
1009         rc = cib_status;
1010         crm_err("Operation ignored, cluster configuration is invalid."
1011                 " Please repair and restart: %s", pcmk_strerror(cib_status));
1012 
1013         op_reply = create_xml_node(NULL, "cib-reply");
1014         crm_xml_add(op_reply, F_TYPE, T_CIB);
1015         crm_xml_add(op_reply, F_CIB_OPERATION, op);
1016         crm_xml_add(op_reply, F_CIB_CALLID, call);
1017         crm_xml_add(op_reply, F_CIB_CLIENTID, client_id);
1018         crm_xml_add_int(op_reply, F_CIB_CALLOPTS, call_options);
1019         crm_xml_add_int(op_reply, F_CIB_RC, rc);
1020 
1021         crm_trace("Attaching reply output");
1022         add_message_xml(op_reply, F_CIB_CALLDATA, the_cib);
1023 
1024         crm_log_xml_explicit(op_reply, "cib:reply");
1025 
1026     } else if (process) {
1027         time_t finished = 0;
1028         time_t now = time(NULL);
1029         int level = LOG_INFO;
1030         const char *section = crm_element_value(request, F_CIB_SECTION);
1031 
1032         rc = cib_process_command(request, &op_reply, &result_diff, privileged);
1033 
1034         if (!is_update) {
1035             level = LOG_TRACE;
1036 
1037         } else if (global_update) {
1038             switch (rc) {
1039                 case pcmk_ok:
1040                     level = LOG_INFO;
1041                     break;
1042                 case -pcmk_err_old_data:
1043                 case -pcmk_err_diff_resync:
1044                 case -pcmk_err_diff_failed:
1045                     level = LOG_TRACE;
1046                     break;
1047                 default:
1048                     level = LOG_ERR;
1049             }
1050 
1051         } else if (rc != pcmk_ok) {
1052             level = LOG_WARNING;
1053         }
1054 
1055         do_crm_log(level,
1056                    "Completed %s operation for section %s: %s (rc=%d, origin=%s/%s/%s, version=%s.%s.%s)",
1057                    op, section ? section : "'all'", pcmk_strerror(rc), rc,
1058                    originator ? originator : "local", client_name, call_id,
1059                    the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN) : "0",
1060                    the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION) : "0",
1061                    the_cib ? crm_element_value(the_cib, XML_ATTR_NUMUPDATES) : "0");
1062 
1063         finished = time(NULL);
1064         if ((finished - now) > 3) {
1065             crm_trace("%s operation took %lds to complete", op, (long)(finished - now));
1066             crm_write_blackbox(0, NULL);
1067         }
1068 
1069         if (op_reply == NULL && (needs_reply || local_notify)) {
1070             crm_err("Unexpected NULL reply to message");
1071             crm_log_xml_err(request, "null reply");
1072             needs_reply = FALSE;
1073             local_notify = FALSE;
1074         }
1075     }
1076 
1077     if (is_update && !cib_legacy_mode()) {
1078         crm_trace("Completed pre-sync update from %s/%s/%s%s",
1079                   originator ? originator : "local", client_name, call_id,
1080                   local_notify?" with local notification":"");
1081 
1082     } else if (!needs_reply || stand_alone) {
1083         // This was a non-originating slave update
1084         crm_trace("Completed slave update");
1085 
1086     } else if (cib_legacy_mode() &&
1087                rc == pcmk_ok && result_diff != NULL && !(call_options & cib_inhibit_bcast)) {
1088         gboolean broadcast = FALSE;
1089 
1090         cib_local_bcast_num++;
1091         crm_xml_add_int(request, F_CIB_LOCAL_NOTIFY_ID, cib_local_bcast_num);
1092         broadcast = send_peer_reply(request, result_diff, originator, TRUE);
1093 
1094         if (broadcast && client_id && local_notify && op_reply) {
1095 
1096             /* If we have been asked to sync the reply,
1097              * and a bcast msg has gone out, we queue the local notify
1098              * until we know the bcast message has been received */
1099             local_notify = FALSE;
1100             crm_trace("Queuing local %ssync notification for %s",
1101                       (call_options & cib_sync_call) ? "" : "a-", client_id);
1102 
1103             queue_local_notify(op_reply, client_id,
1104                                pcmk_is_set(call_options, cib_sync_call),
1105                                (cib_client == NULL));
1106             op_reply = NULL;    /* the reply is queued, so don't free here */
1107         }
1108 
1109     } else if (call_options & cib_discard_reply) {
1110         crm_trace("Caller isn't interested in reply");
1111 
1112     } else if (cib_client == NULL) {
1113         if (is_update == FALSE || result_diff == NULL) {
1114             crm_trace("Request not broadcast: R/O call");
1115 
1116         } else if (call_options & cib_inhibit_bcast) {
1117             crm_trace("Request not broadcast: inhibited");
1118 
1119         } else if (rc != pcmk_ok) {
1120             crm_trace("Request not broadcast: call failed: %s", pcmk_strerror(rc));
1121 
1122         } else {
1123             crm_trace("Directing reply to %s", originator);
1124         }
1125 
1126         send_peer_reply(op_reply, result_diff, originator, FALSE);
1127     }
1128 
1129     if (local_notify && client_id) {
1130         crm_trace("Performing local %ssync notification for %s",
1131                   (pcmk_is_set(call_options, cib_sync_call)? "" : "a"),
1132                   client_id);
1133         if (process == FALSE) {
1134             do_local_notify(request, client_id,
1135                             pcmk_is_set(call_options, cib_sync_call),
1136                             (cib_client == NULL));
1137         } else {
1138             do_local_notify(op_reply, client_id,
1139                             pcmk_is_set(call_options, cib_sync_call),
1140                             (cib_client == NULL));
1141         }
1142     }
1143 
1144     free_xml(op_reply);
1145     free_xml(result_diff);
1146 
1147     return;
1148 }
1149 
1150 static int
1151 cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged)
     /* [previous][next][first][last][top][bottom][index][help] */
1152 {
1153     xmlNode *input = NULL;
1154     xmlNode *output = NULL;
1155     xmlNode *result_cib = NULL;
1156     xmlNode *current_cib = NULL;
1157 
1158     int call_type = 0;
1159     int call_options = 0;
1160 
1161     const char *op = NULL;
1162     const char *section = NULL;
1163     const char *call_id = crm_element_value(request, F_CIB_CALLID);
1164 
1165     int rc = pcmk_ok;
1166     int rc2 = pcmk_ok;
1167 
1168     gboolean send_r_notify = FALSE;
1169     gboolean global_update = FALSE;
1170     gboolean config_changed = FALSE;
1171     gboolean manage_counters = TRUE;
1172 
1173     static mainloop_timer_t *digest_timer = NULL;
1174 
1175     CRM_ASSERT(cib_status == pcmk_ok);
1176 
1177     if(digest_timer == NULL) {
1178         digest_timer = mainloop_timer_add("digester", 5000, FALSE, cib_digester_cb, NULL);
1179     }
1180 
1181     *reply = NULL;
1182     *cib_diff = NULL;
1183     current_cib = the_cib;
1184 
1185     /* Start processing the request... */
1186     op = crm_element_value(request, F_CIB_OPERATION);
1187     crm_element_value_int(request, F_CIB_CALLOPTS, &call_options);
1188     rc = cib_get_operation_id(op, &call_type);
1189 
1190     if (rc == pcmk_ok && privileged == FALSE) {
1191         rc = cib_op_can_run(call_type, call_options, privileged, global_update);
1192     }
1193 
1194     rc2 = cib_op_prepare(call_type, request, &input, &section);
1195     if (rc == pcmk_ok) {
1196         rc = rc2;
1197     }
1198 
1199     if (rc != pcmk_ok) {
1200         crm_trace("Call setup failed: %s", pcmk_strerror(rc));
1201         goto done;
1202 
1203     } else if (cib_op_modifies(call_type) == FALSE) {
1204         rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE,
1205                             section, request, input, FALSE, &config_changed,
1206                             current_cib, &result_cib, NULL, &output);
1207 
1208         CRM_CHECK(result_cib == NULL, free_xml(result_cib));
1209         goto done;
1210     }
1211 
1212     /* Handle a valid write action */
1213     global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE));
1214     if (global_update) {
1215         /* legacy code */
1216         manage_counters = FALSE;
1217         cib__set_call_options(call_options, "call", cib_force_diff);
1218         crm_trace("Global update detected");
1219 
1220         CRM_CHECK(call_type == 3 || call_type == 4, crm_err("Call type: %d", call_type);
1221                   crm_log_xml_err(request, "bad op"));
1222     }
1223 
1224     if (rc == pcmk_ok) {
1225         ping_modified_since = TRUE;
1226         if (call_options & cib_inhibit_bcast) {
1227             /* skip */
1228             crm_trace("Skipping update: inhibit broadcast");
1229             manage_counters = FALSE;
1230         }
1231 
1232         if (!pcmk_is_set(call_options, cib_dryrun)
1233             && pcmk__str_eq(section, XML_CIB_TAG_STATUS, pcmk__str_casei)) {
1234             /* Copying large CIBs accounts for a huge percentage of our CIB usage */
1235             cib__set_call_options(call_options, "call", cib_zero_copy);
1236         } else {
1237             cib__clear_call_options(call_options, "call", cib_zero_copy);
1238         }
1239 
1240         /* result_cib must not be modified after cib_perform_op() returns */
1241         rc = cib_perform_op(op, call_options, cib_op_func(call_type), FALSE,
1242                             section, request, input, manage_counters, &config_changed,
1243                             current_cib, &result_cib, cib_diff, &output);
1244 
1245         if (manage_counters == FALSE) {
1246             int format = 1;
1247             /* Legacy code
1248              * If the diff is NULL at this point, it's because nothing changed
1249              */
1250             if (*cib_diff) {
1251                 crm_element_value_int(*cib_diff, "format", &format);
1252             }
1253 
1254             if (format == 1) {
1255                 config_changed = cib_config_changed(NULL, NULL, cib_diff);
1256             }
1257         }
1258 
1259         /* Always write to disk for replace ops,
1260          * this also negates the need to detect ordering changes
1261          */
1262         if (pcmk__str_eq(CIB_OP_REPLACE, op, pcmk__str_none)) {
1263             config_changed = TRUE;
1264         }
1265     }
1266 
1267     if (rc == pcmk_ok && !pcmk_is_set(call_options, cib_dryrun)) {
1268         crm_trace("Activating %s->%s%s%s",
1269                   crm_element_value(current_cib, XML_ATTR_NUMUPDATES),
1270                   crm_element_value(result_cib, XML_ATTR_NUMUPDATES),
1271                   (pcmk_is_set(call_options, cib_zero_copy)? " zero-copy" : ""),
1272                   (config_changed? " changed" : ""));
1273         if (!pcmk_is_set(call_options, cib_zero_copy)) {
1274             rc = activateCibXml(result_cib, config_changed, op);
1275             crm_trace("Activated %s (%d)",
1276                       crm_element_value(current_cib, XML_ATTR_NUMUPDATES), rc);
1277         }
1278 
1279         if (rc == pcmk_ok && cib_internal_config_changed(*cib_diff)) {
1280             cib_read_config(config_hash, result_cib);
1281         }
1282 
1283         if (pcmk__str_eq(CIB_OP_REPLACE, op, pcmk__str_none)) {
1284             if (section == NULL) {
1285                 send_r_notify = TRUE;
1286 
1287             } else if (pcmk__str_eq(section, XML_TAG_CIB, pcmk__str_casei)) {
1288                 send_r_notify = TRUE;
1289 
1290             } else if (pcmk__str_eq(section, XML_CIB_TAG_NODES, pcmk__str_casei)) {
1291                 send_r_notify = TRUE;
1292 
1293             } else if (pcmk__str_eq(section, XML_CIB_TAG_STATUS, pcmk__str_casei)) {
1294                 send_r_notify = TRUE;
1295 
1296             } else if (pcmk__str_eq(section, XML_CIB_TAG_CONFIGURATION, pcmk__str_casei)) {
1297                 send_r_notify = TRUE;
1298             }
1299 
1300         } else if (pcmk__str_eq(CIB_OP_ERASE, op, pcmk__str_none)) {
1301             send_r_notify = TRUE;
1302         }
1303 
1304         mainloop_timer_stop(digest_timer);
1305         mainloop_timer_start(digest_timer);
1306 
1307     } else if (rc == -pcmk_err_schema_validation) {
1308         CRM_ASSERT(!pcmk_is_set(call_options, cib_zero_copy));
1309 
1310         if (output != NULL) {
1311             crm_log_xml_info(output, "cib:output");
1312             free_xml(output);
1313         }
1314 
1315         output = result_cib;
1316 
1317     } else {
1318         crm_trace("Not activating %d %d %s", rc,
1319                   pcmk_is_set(call_options, cib_dryrun),
1320                   crm_element_value(result_cib, XML_ATTR_NUMUPDATES));
1321         if (!pcmk_is_set(call_options, cib_zero_copy)) {
1322             free_xml(result_cib);
1323         }
1324     }
1325 
1326     if ((call_options & (cib_inhibit_notify|cib_dryrun)) == 0) {
1327         const char *client = crm_element_value(request, F_CIB_CLIENTNAME);
1328 
1329         crm_trace("Sending notifications %d",
1330                   pcmk_is_set(call_options, cib_dryrun));
1331         cib_diff_notify(call_options, client, call_id, op, input, rc, *cib_diff);
1332     }
1333 
1334     if (send_r_notify) {
1335         const char *origin = crm_element_value(request, F_ORIG);
1336 
1337         cib_replace_notify(origin, the_cib, rc, *cib_diff);
1338     }
1339 
1340     xml_log_patchset(LOG_TRACE, "cib:diff", *cib_diff);
1341   done:
1342     if (!pcmk_is_set(call_options, cib_discard_reply) || cib_legacy_mode()) {
1343         const char *caller = crm_element_value(request, F_CIB_CLIENTID);
1344 
1345         *reply = create_xml_node(NULL, "cib-reply");
1346         crm_xml_add(*reply, F_TYPE, T_CIB);
1347         crm_xml_add(*reply, F_CIB_OPERATION, op);
1348         crm_xml_add(*reply, F_CIB_CALLID, call_id);
1349         crm_xml_add(*reply, F_CIB_CLIENTID, caller);
1350         crm_xml_add_int(*reply, F_CIB_CALLOPTS, call_options);
1351         crm_xml_add_int(*reply, F_CIB_RC, rc);
1352 
1353         if (output != NULL) {
1354             crm_trace("Attaching reply output");
1355             add_message_xml(*reply, F_CIB_CALLDATA, output);
1356         }
1357 
1358         crm_log_xml_explicit(*reply, "cib:reply");
1359     }
1360 
1361     crm_trace("cleanup");
1362 
1363     if (cib_op_modifies(call_type) == FALSE && output != current_cib) {
1364         free_xml(output);
1365         output = NULL;
1366     }
1367 
1368     if (call_type >= 0) {
1369         cib_op_cleanup(call_type, call_options, &input, &output);
1370     }
1371 
1372     crm_trace("done");
1373     return rc;
1374 }
1375 
1376 void
1377 cib_peer_callback(xmlNode * msg, void *private_data)
     /* [previous][next][first][last][top][bottom][index][help] */
1378 {
1379     const char *reason = NULL;
1380     const char *originator = crm_element_value(msg, F_ORIG);
1381 
1382     if (cib_legacy_mode() && pcmk__str_eq(originator, cib_our_uname, pcmk__str_null_matches)) {
1383         /* message is from ourselves */
1384         int bcast_id = 0;
1385 
1386         if (!(crm_element_value_int(msg, F_CIB_LOCAL_NOTIFY_ID, &bcast_id))) {
1387             check_local_notify(bcast_id);
1388         }
1389         return;
1390 
1391     } else if (crm_peer_cache == NULL) {
1392         reason = "membership not established";
1393         goto bail;
1394     }
1395 
1396     if (crm_element_value(msg, F_CIB_CLIENTNAME) == NULL) {
1397         crm_xml_add(msg, F_CIB_CLIENTNAME, originator);
1398     }
1399 
1400     /* crm_log_xml_trace("Peer[inbound]", msg); */
1401     cib_process_request(msg, TRUE, NULL);
1402     return;
1403 
1404   bail:
1405     if (reason) {
1406         const char *seq = crm_element_value(msg, F_SEQ);
1407         const char *op = crm_element_value(msg, F_CIB_OPERATION);
1408 
1409         crm_warn("Discarding %s message (%s) from %s: %s", op, seq, originator, reason);
1410     }
1411 }
1412 
1413 static gboolean
1414 cib_force_exit(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
1415 {
1416     crm_notice("Forcing exit!");
1417     terminate_cib(__func__, CRM_EX_ERROR);
1418     return FALSE;
1419 }
1420 
1421 static void
1422 disconnect_remote_client(gpointer key, gpointer value, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
1423 {
1424     pcmk__client_t *a_client = value;
1425 
1426     crm_err("Disconnecting %s... Not implemented", crm_str(a_client->name));
1427 }
1428 
1429 void
1430 cib_shutdown(int nsig)
     /* [previous][next][first][last][top][bottom][index][help] */
1431 {
1432     struct qb_ipcs_stats srv_stats;
1433 
1434     if (cib_shutdown_flag == FALSE) {
1435         int disconnects = 0;
1436         qb_ipcs_connection_t *c = NULL;
1437 
1438         cib_shutdown_flag = TRUE;
1439 
1440         c = qb_ipcs_connection_first_get(ipcs_rw);
1441         while (c != NULL) {
1442             qb_ipcs_connection_t *last = c;
1443 
1444             c = qb_ipcs_connection_next_get(ipcs_rw, last);
1445 
1446             crm_debug("Disconnecting r/w client %p...", last);
1447             qb_ipcs_disconnect(last);
1448             qb_ipcs_connection_unref(last);
1449             disconnects++;
1450         }
1451 
1452         c = qb_ipcs_connection_first_get(ipcs_ro);
1453         while (c != NULL) {
1454             qb_ipcs_connection_t *last = c;
1455 
1456             c = qb_ipcs_connection_next_get(ipcs_ro, last);
1457 
1458             crm_debug("Disconnecting r/o client %p...", last);
1459             qb_ipcs_disconnect(last);
1460             qb_ipcs_connection_unref(last);
1461             disconnects++;
1462         }
1463 
1464         c = qb_ipcs_connection_first_get(ipcs_shm);
1465         while (c != NULL) {
1466             qb_ipcs_connection_t *last = c;
1467 
1468             c = qb_ipcs_connection_next_get(ipcs_shm, last);
1469 
1470             crm_debug("Disconnecting non-blocking r/w client %p...", last);
1471             qb_ipcs_disconnect(last);
1472             qb_ipcs_connection_unref(last);
1473             disconnects++;
1474         }
1475 
1476         disconnects += pcmk__ipc_client_count();
1477 
1478         crm_debug("Disconnecting %d remote clients", pcmk__ipc_client_count());
1479         pcmk__foreach_ipc_client(disconnect_remote_client, NULL);
1480         crm_info("Disconnected %d clients", disconnects);
1481     }
1482 
1483     qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE);
1484 
1485     if (pcmk__ipc_client_count() == 0) {
1486         crm_info("All clients disconnected (%d)", srv_stats.active_connections);
1487         initiate_exit();
1488 
1489     } else {
1490         crm_info("Waiting on %d clients to disconnect (%d)",
1491                  pcmk__ipc_client_count(), srv_stats.active_connections);
1492     }
1493 }
1494 
1495 void
1496 initiate_exit(void)
     /* [previous][next][first][last][top][bottom][index][help] */
1497 {
1498     int active = 0;
1499     xmlNode *leaving = NULL;
1500 
1501     active = crm_active_peers();
1502     if (active < 2) {
1503         terminate_cib(__func__, 0);
1504         return;
1505     }
1506 
1507     crm_info("Sending disconnect notification to %d peers...", active);
1508 
1509     leaving = create_xml_node(NULL, "exit-notification");
1510     crm_xml_add(leaving, F_TYPE, "cib");
1511     crm_xml_add(leaving, F_CIB_OPERATION, "cib_shutdown_req");
1512 
1513     send_cluster_message(NULL, crm_msg_cib, leaving, TRUE);
1514     free_xml(leaving);
1515 
1516     g_timeout_add(EXIT_ESCALATION_MS, cib_force_exit, NULL);
1517 }
1518 
1519 extern int remote_fd;
1520 extern int remote_tls_fd;
1521 
1522 /*!
1523  * \internal
1524  * \brief Close remote sockets, free the global CIB and quit
1525  *
1526  * \param[in] caller           Name of calling function (for log message)
1527  * \param[in] fast             If -1, skip disconnect; if positive, exit that
1528  */
1529 void
1530 terminate_cib(const char *caller, int fast)
     /* [previous][next][first][last][top][bottom][index][help] */
1531 {
1532     crm_info("%s: Exiting%s...", caller,
1533              (fast > 0)? " fast" : mainloop ? " from mainloop" : "");
1534 
1535     if (remote_fd > 0) {
1536         close(remote_fd);
1537         remote_fd = 0;
1538     }
1539     if (remote_tls_fd > 0) {
1540         close(remote_tls_fd);
1541         remote_tls_fd = 0;
1542     }
1543 
1544     uninitializeCib();
1545 
1546     if (fast > 0) {
1547         /* Quit fast on error */
1548         pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm);
1549         crm_exit(fast);
1550 
1551     } else if ((mainloop != NULL) && g_main_loop_is_running(mainloop)) {
1552         /* Quit via returning from the main loop. If fast == -1, we skip the
1553          * disconnect here, and it will be done when the main loop returns
1554          * (this allows the peer status callback to avoid messing with the
1555          * peer caches).
1556          */
1557         if (fast == 0) {
1558             crm_cluster_disconnect(&crm_cluster);
1559         }
1560         g_main_loop_quit(mainloop);
1561 
1562     } else {
1563         /* Quit via clean exit. Even the peer status callback can disconnect
1564          * here, because we're not returning control to the caller. */
1565         crm_cluster_disconnect(&crm_cluster);
1566         pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm);
1567         crm_exit(CRM_EX_OK);
1568     }
1569 }

/* [previous][next][first][last][top][bottom][index][help] */