root/lib/pengine/unpack.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. is_dangling_guest_node
  2. pe_fence_node
  3. set_if_xpath
  4. unpack_config
  5. pe_create_node
  6. expand_remote_rsc_meta
  7. handle_startup_fencing
  8. unpack_nodes
  9. setup_container
  10. unpack_remote_nodes
  11. link_rsc2remotenode
  12. destroy_tag
  13. unpack_resources
  14. unpack_tags
  15. unpack_ticket_state
  16. unpack_tickets_state
  17. unpack_handle_remote_attrs
  18. unpack_transient_attributes
  19. unpack_node_state
  20. unpack_node_history
  21. unpack_status
  22. determine_online_status_no_fencing
  23. determine_online_status_fencing
  24. determine_remote_online_status
  25. determine_online_status
  26. pe_base_name_end
  27. clone_strip
  28. clone_zero
  29. create_fake_resource
  30. create_anonymous_orphan
  31. find_anonymous_clone
  32. unpack_find_resource
  33. process_orphan_resource
  34. process_rsc_state
  35. process_recurring
  36. calculate_active_ops
  37. unpack_shutdown_lock
  38. unpack_lrm_resource
  39. handle_orphaned_container_fillers
  40. unpack_node_lrm
  41. set_active
  42. set_node_score
  43. find_lrm_op
  44. find_lrm_resource
  45. unknown_on_node
  46. monitor_not_running_after
  47. non_monitor_after
  48. newer_state_after_migrate
  49. get_migration_node_names
  50. add_dangling_migration
  51. unpack_migrate_to_success
  52. unpack_migrate_to_failure
  53. unpack_migrate_from_failure
  54. record_failed_op
  55. last_change_str
  56. cmp_on_fail
  57. ban_from_all_nodes
  58. unpack_rsc_op_failure
  59. block_if_unrecoverable
  60. remap_because
  61. remap_operation
  62. should_clear_for_param_change
  63. order_after_remote_fencing
  64. should_ignore_failure_timeout
  65. check_operation_expiry
  66. pe__target_rc_from_xml
  67. get_action_on_fail
  68. update_resource_state
  69. can_affect_state
  70. unpack_action_result
  71. process_expired_result
  72. mask_probe_failure
  73. failure_is_newer
  74. process_pending_action
  75. unpack_rsc_op
  76. add_node_attrs
  77. extract_operations
  78. find_operations

   1 /*
   2  * Copyright 2004-2023 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <stdio.h>
  13 #include <string.h>
  14 #include <glib.h>
  15 #include <time.h>
  16 
  17 #include <crm/crm.h>
  18 #include <crm/services.h>
  19 #include <crm/msg_xml.h>
  20 #include <crm/common/xml.h>
  21 #include <crm/common/xml_internal.h>
  22 
  23 #include <crm/common/util.h>
  24 #include <crm/pengine/rules.h>
  25 #include <crm/pengine/internal.h>
  26 #include <pe_status_private.h>
  27 
  28 CRM_TRACE_INIT_DATA(pe_status);
  29 
  30 // A (parsed) resource action history entry
  31 struct action_history {
  32     pe_resource_t *rsc;       // Resource that history is for
  33     pe_node_t *node;          // Node that history is for
  34     xmlNode *xml;             // History entry XML
  35 
  36     // Parsed from entry XML
  37     const char *id;           // XML ID of history entry
  38     const char *key;          // Operation key of action
  39     const char *task;         // Action name
  40     const char *exit_reason;  // Exit reason given for result
  41     guint interval_ms;        // Action interval
  42     int call_id;              // Call ID of action
  43     int expected_exit_status; // Expected exit status of action
  44     int exit_status;          // Actual exit status of action
  45     int execution_status;     // Execution status of action
  46 };
  47 
  48 /* This uses pcmk__set_flags_as()/pcmk__clear_flags_as() directly rather than
  49  * use pe__set_working_set_flags()/pe__clear_working_set_flags() so that the
  50  * flag is stringified more readably in log messages.
  51  */
  52 #define set_config_flag(data_set, option, flag) do {                        \
  53         const char *scf_value = pe_pref((data_set)->config_hash, (option)); \
  54         if (scf_value != NULL) {                                            \
  55             if (crm_is_true(scf_value)) {                                   \
  56                 (data_set)->flags = pcmk__set_flags_as(__func__, __LINE__,  \
  57                                     LOG_TRACE, "Working set",               \
  58                                     crm_system_name, (data_set)->flags,     \
  59                                     (flag), #flag);                         \
  60             } else {                                                        \
  61                 (data_set)->flags = pcmk__clear_flags_as(__func__, __LINE__,\
  62                                     LOG_TRACE, "Working set",               \
  63                                     crm_system_name, (data_set)->flags,     \
  64                                     (flag), #flag);                         \
  65             }                                                               \
  66         }                                                                   \
  67     } while(0)
  68 
  69 static void unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
  70                           xmlNode **last_failure,
  71                           enum action_fail_response *failed);
  72 static void determine_remote_online_status(pe_working_set_t *data_set,
  73                                            pe_node_t *this_node);
  74 static void add_node_attrs(const xmlNode *xml_obj, pe_node_t *node,
  75                            bool overwrite, pe_working_set_t *data_set);
  76 static void determine_online_status(const xmlNode *node_state,
  77                                     pe_node_t *this_node,
  78                                     pe_working_set_t *data_set);
  79 
  80 static void unpack_node_lrm(pe_node_t *node, const xmlNode *xml,
  81                             pe_working_set_t *data_set);
  82 
  83 
  84 // Bitmask for warnings we only want to print once
  85 uint32_t pe_wo = 0;
  86 
  87 static gboolean
  88 is_dangling_guest_node(pe_node_t *node)
     /* [previous][next][first][last][top][bottom][index][help] */
  89 {
  90     /* we are looking for a remote-node that was supposed to be mapped to a
  91      * container resource, but all traces of that container have disappeared 
  92      * from both the config and the status section. */
  93     if (pe__is_guest_or_remote_node(node) &&
  94         node->details->remote_rsc &&
  95         node->details->remote_rsc->container == NULL &&
  96         pcmk_is_set(node->details->remote_rsc->flags,
  97                     pe_rsc_orphan_container_filler)) {
  98         return TRUE;
  99     }
 100 
 101     return FALSE;
 102 }
 103 
 104 /*!
 105  * \brief Schedule a fence action for a node
 106  *
 107  * \param[in,out] data_set  Current working set of cluster
 108  * \param[in,out] node      Node to fence
 109  * \param[in]     reason    Text description of why fencing is needed
 110  * \param[in]     priority_delay  Whether to consider `priority-fencing-delay`
 111  */
 112 void
 113 pe_fence_node(pe_working_set_t * data_set, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
 114               const char *reason, bool priority_delay)
 115 {
 116     CRM_CHECK(node, return);
 117 
 118     /* A guest node is fenced by marking its container as failed */
 119     if (pe__is_guest_node(node)) {
 120         pe_resource_t *rsc = node->details->remote_rsc->container;
 121 
 122         if (!pcmk_is_set(rsc->flags, pe_rsc_failed)) {
 123             if (!pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 124                 crm_notice("Not fencing guest node %s "
 125                            "(otherwise would because %s): "
 126                            "its guest resource %s is unmanaged",
 127                            pe__node_name(node), reason, rsc->id);
 128             } else {
 129                 crm_warn("Guest node %s will be fenced "
 130                          "(by recovering its guest resource %s): %s",
 131                          pe__node_name(node), rsc->id, reason);
 132 
 133                 /* We don't mark the node as unclean because that would prevent the
 134                  * node from running resources. We want to allow it to run resources
 135                  * in this transition if the recovery succeeds.
 136                  */
 137                 node->details->remote_requires_reset = TRUE;
 138                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
 139             }
 140         }
 141 
 142     } else if (is_dangling_guest_node(node)) {
 143         crm_info("Cleaning up dangling connection for guest node %s: "
 144                  "fencing was already done because %s, "
 145                  "and guest resource no longer exists",
 146                  pe__node_name(node), reason);
 147         pe__set_resource_flags(node->details->remote_rsc,
 148                                pe_rsc_failed|pe_rsc_stop);
 149 
 150     } else if (pe__is_remote_node(node)) {
 151         pe_resource_t *rsc = node->details->remote_rsc;
 152 
 153         if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 154             crm_notice("Not fencing remote node %s "
 155                        "(otherwise would because %s): connection is unmanaged",
 156                        pe__node_name(node), reason);
 157         } else if(node->details->remote_requires_reset == FALSE) {
 158             node->details->remote_requires_reset = TRUE;
 159             crm_warn("Remote node %s %s: %s",
 160                      pe__node_name(node),
 161                      pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 162                      reason);
 163         }
 164         node->details->unclean = TRUE;
 165         // No need to apply `priority-fencing-delay` for remote nodes
 166         pe_fence_op(node, NULL, TRUE, reason, FALSE, data_set);
 167 
 168     } else if (node->details->unclean) {
 169         crm_trace("Cluster node %s %s because %s",
 170                   pe__node_name(node),
 171                   pe_can_fence(data_set, node)? "would also be fenced" : "also is unclean",
 172                   reason);
 173 
 174     } else {
 175         crm_warn("Cluster node %s %s: %s",
 176                  pe__node_name(node),
 177                  pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 178                  reason);
 179         node->details->unclean = TRUE;
 180         pe_fence_op(node, NULL, TRUE, reason, priority_delay, data_set);
 181     }
 182 }
 183 
 184 // @TODO xpaths can't handle templates, rules, or id-refs
 185 
 186 // nvpair with provides or requires set to unfencing
 187 #define XPATH_UNFENCING_NVPAIR XML_CIB_TAG_NVPAIR                \
 188     "[(@" XML_NVPAIR_ATTR_NAME "='" PCMK_STONITH_PROVIDES "'"    \
 189     "or @" XML_NVPAIR_ATTR_NAME "='" XML_RSC_ATTR_REQUIRES "') " \
 190     "and @" XML_NVPAIR_ATTR_VALUE "='" PCMK__VALUE_UNFENCING "']"
 191 
 192 // unfencing in rsc_defaults or any resource
 193 #define XPATH_ENABLE_UNFENCING \
 194     "/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RESOURCES   \
 195     "//" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR                                               \
 196     "|/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RSCCONFIG  \
 197     "/" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR
 198 
 199 static void
 200 set_if_xpath(uint64_t flag, const char *xpath, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 201 {
 202     xmlXPathObjectPtr result = NULL;
 203 
 204     if (!pcmk_is_set(data_set->flags, flag)) {
 205         result = xpath_search(data_set->input, xpath);
 206         if (result && (numXpathResults(result) > 0)) {
 207             pe__set_working_set_flags(data_set, flag);
 208         }
 209         freeXpathObject(result);
 210     }
 211 }
 212 
 213 gboolean
 214 unpack_config(xmlNode * config, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 215 {
 216     const char *value = NULL;
 217     GHashTable *config_hash = pcmk__strkey_table(free, free);
 218 
 219     pe_rule_eval_data_t rule_data = {
 220         .node_hash = NULL,
 221         .role = RSC_ROLE_UNKNOWN,
 222         .now = data_set->now,
 223         .match_data = NULL,
 224         .rsc_data = NULL,
 225         .op_data = NULL
 226     };
 227 
 228     data_set->config_hash = config_hash;
 229 
 230     pe__unpack_dataset_nvpairs(config, XML_CIB_TAG_PROPSET, &rule_data, config_hash,
 231                                CIB_OPTIONS_FIRST, FALSE, data_set);
 232 
 233     verify_pe_options(data_set->config_hash);
 234 
 235     set_config_flag(data_set, "enable-startup-probes", pe_flag_startup_probes);
 236     if (!pcmk_is_set(data_set->flags, pe_flag_startup_probes)) {
 237         crm_info("Startup probes: disabled (dangerous)");
 238     }
 239 
 240     value = pe_pref(data_set->config_hash, XML_ATTR_HAVE_WATCHDOG);
 241     if (value && crm_is_true(value)) {
 242         crm_info("Watchdog-based self-fencing will be performed via SBD if "
 243                  "fencing is required and stonith-watchdog-timeout is nonzero");
 244         pe__set_working_set_flags(data_set, pe_flag_have_stonith_resource);
 245     }
 246 
 247     /* Set certain flags via xpath here, so they can be used before the relevant
 248      * configuration sections are unpacked.
 249      */
 250     set_if_xpath(pe_flag_enable_unfencing, XPATH_ENABLE_UNFENCING, data_set);
 251 
 252     value = pe_pref(data_set->config_hash, "stonith-timeout");
 253     data_set->stonith_timeout = (int) crm_parse_interval_spec(value);
 254     crm_debug("STONITH timeout: %d", data_set->stonith_timeout);
 255 
 256     set_config_flag(data_set, "stonith-enabled", pe_flag_stonith_enabled);
 257     crm_debug("STONITH of failed nodes is %s",
 258               pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)? "enabled" : "disabled");
 259 
 260     data_set->stonith_action = pe_pref(data_set->config_hash, "stonith-action");
 261     if (!strcmp(data_set->stonith_action, "poweroff")) {
 262         pe_warn_once(pe_wo_poweroff,
 263                      "Support for stonith-action of 'poweroff' is deprecated "
 264                      "and will be removed in a future release (use 'off' instead)");
 265         data_set->stonith_action = "off";
 266     }
 267     crm_trace("STONITH will %s nodes", data_set->stonith_action);
 268 
 269     set_config_flag(data_set, "concurrent-fencing", pe_flag_concurrent_fencing);
 270     crm_debug("Concurrent fencing is %s",
 271               pcmk_is_set(data_set->flags, pe_flag_concurrent_fencing)? "enabled" : "disabled");
 272 
 273     value = pe_pref(data_set->config_hash,
 274                     XML_CONFIG_ATTR_PRIORITY_FENCING_DELAY);
 275     if (value) {
 276         data_set->priority_fencing_delay = crm_parse_interval_spec(value) / 1000;
 277         crm_trace("Priority fencing delay is %ds", data_set->priority_fencing_delay);
 278     }
 279 
 280     set_config_flag(data_set, "stop-all-resources", pe_flag_stop_everything);
 281     crm_debug("Stop all active resources: %s",
 282               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_stop_everything)));
 283 
 284     set_config_flag(data_set, "symmetric-cluster", pe_flag_symmetric_cluster);
 285     if (pcmk_is_set(data_set->flags, pe_flag_symmetric_cluster)) {
 286         crm_debug("Cluster is symmetric" " - resources can run anywhere by default");
 287     }
 288 
 289     value = pe_pref(data_set->config_hash, "no-quorum-policy");
 290 
 291     if (pcmk__str_eq(value, "ignore", pcmk__str_casei)) {
 292         data_set->no_quorum_policy = no_quorum_ignore;
 293 
 294     } else if (pcmk__str_eq(value, "freeze", pcmk__str_casei)) {
 295         data_set->no_quorum_policy = no_quorum_freeze;
 296 
 297     } else if (pcmk__str_eq(value, "demote", pcmk__str_casei)) {
 298         data_set->no_quorum_policy = no_quorum_demote;
 299 
 300     } else if (pcmk__str_eq(value, "suicide", pcmk__str_casei)) {
 301         if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 302             int do_panic = 0;
 303 
 304             crm_element_value_int(data_set->input, XML_ATTR_QUORUM_PANIC,
 305                                   &do_panic);
 306             if (do_panic || pcmk_is_set(data_set->flags, pe_flag_have_quorum)) {
 307                 data_set->no_quorum_policy = no_quorum_suicide;
 308             } else {
 309                 crm_notice("Resetting no-quorum-policy to 'stop': cluster has never had quorum");
 310                 data_set->no_quorum_policy = no_quorum_stop;
 311             }
 312         } else {
 313             pcmk__config_err("Resetting no-quorum-policy to 'stop' because "
 314                              "fencing is disabled");
 315             data_set->no_quorum_policy = no_quorum_stop;
 316         }
 317 
 318     } else {
 319         data_set->no_quorum_policy = no_quorum_stop;
 320     }
 321 
 322     switch (data_set->no_quorum_policy) {
 323         case no_quorum_freeze:
 324             crm_debug("On loss of quorum: Freeze resources");
 325             break;
 326         case no_quorum_stop:
 327             crm_debug("On loss of quorum: Stop ALL resources");
 328             break;
 329         case no_quorum_demote:
 330             crm_debug("On loss of quorum: "
 331                       "Demote promotable resources and stop other resources");
 332             break;
 333         case no_quorum_suicide:
 334             crm_notice("On loss of quorum: Fence all remaining nodes");
 335             break;
 336         case no_quorum_ignore:
 337             crm_notice("On loss of quorum: Ignore");
 338             break;
 339     }
 340 
 341     set_config_flag(data_set, "stop-orphan-resources", pe_flag_stop_rsc_orphans);
 342     crm_trace("Orphan resources are %s",
 343               pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)? "stopped" : "ignored");
 344 
 345     set_config_flag(data_set, "stop-orphan-actions", pe_flag_stop_action_orphans);
 346     crm_trace("Orphan resource actions are %s",
 347               pcmk_is_set(data_set->flags, pe_flag_stop_action_orphans)? "stopped" : "ignored");
 348 
 349     value = pe_pref(data_set->config_hash, "remove-after-stop");
 350     if (value != NULL) {
 351         if (crm_is_true(value)) {
 352             pe__set_working_set_flags(data_set, pe_flag_remove_after_stop);
 353 #ifndef PCMK__COMPAT_2_0
 354             pe_warn_once(pe_wo_remove_after,
 355                          "Support for the remove-after-stop cluster property is"
 356                          " deprecated and will be removed in a future release");
 357 #endif
 358         } else {
 359             pe__clear_working_set_flags(data_set, pe_flag_remove_after_stop);
 360         }
 361     }
 362 
 363     set_config_flag(data_set, "maintenance-mode", pe_flag_maintenance_mode);
 364     crm_trace("Maintenance mode: %s",
 365               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_maintenance_mode)));
 366 
 367     set_config_flag(data_set, "start-failure-is-fatal", pe_flag_start_failure_fatal);
 368     crm_trace("Start failures are %s",
 369               pcmk_is_set(data_set->flags, pe_flag_start_failure_fatal)? "always fatal" : "handled by failcount");
 370 
 371     if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 372         set_config_flag(data_set, "startup-fencing", pe_flag_startup_fencing);
 373     }
 374     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 375         crm_trace("Unseen nodes will be fenced");
 376     } else {
 377         pe_warn_once(pe_wo_blind, "Blind faith: not fencing unseen nodes");
 378     }
 379 
 380     pe__unpack_node_health_scores(data_set);
 381 
 382     data_set->placement_strategy = pe_pref(data_set->config_hash, "placement-strategy");
 383     crm_trace("Placement strategy: %s", data_set->placement_strategy);
 384 
 385     set_config_flag(data_set, "shutdown-lock", pe_flag_shutdown_lock);
 386     crm_trace("Resources will%s be locked to cleanly shut down nodes",
 387               (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)? "" : " not"));
 388     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
 389         value = pe_pref(data_set->config_hash,
 390                         XML_CONFIG_ATTR_SHUTDOWN_LOCK_LIMIT);
 391         data_set->shutdown_lock = crm_parse_interval_spec(value) / 1000;
 392         crm_trace("Shutdown locks expire after %us", data_set->shutdown_lock);
 393     }
 394 
 395     return TRUE;
 396 }
 397 
 398 pe_node_t *
 399 pe_create_node(const char *id, const char *uname, const char *type,
     /* [previous][next][first][last][top][bottom][index][help] */
 400                const char *score, pe_working_set_t * data_set)
 401 {
 402     pe_node_t *new_node = NULL;
 403 
 404     if (pe_find_node(data_set->nodes, uname) != NULL) {
 405         pcmk__config_warn("More than one node entry has name '%s'", uname);
 406     }
 407 
 408     new_node = calloc(1, sizeof(pe_node_t));
 409     if (new_node == NULL) {
 410         return NULL;
 411     }
 412 
 413     new_node->weight = char2score(score);
 414     new_node->details = calloc(1, sizeof(struct pe_node_shared_s));
 415 
 416     if (new_node->details == NULL) {
 417         free(new_node);
 418         return NULL;
 419     }
 420 
 421     crm_trace("Creating node for entry %s/%s", uname, id);
 422     new_node->details->id = id;
 423     new_node->details->uname = uname;
 424     new_node->details->online = FALSE;
 425     new_node->details->shutdown = FALSE;
 426     new_node->details->rsc_discovery_enabled = TRUE;
 427     new_node->details->running_rsc = NULL;
 428     new_node->details->data_set = data_set;
 429 
 430     if (pcmk__str_eq(type, "member", pcmk__str_null_matches | pcmk__str_casei)) {
 431         new_node->details->type = node_member;
 432 
 433     } else if (pcmk__str_eq(type, "remote", pcmk__str_casei)) {
 434         new_node->details->type = node_remote;
 435         pe__set_working_set_flags(data_set, pe_flag_have_remote_nodes);
 436 
 437     } else {
 438         /* @COMPAT 'ping' is the default for backward compatibility, but it
 439          * should be changed to 'member' at a compatibility break
 440          */
 441         if (!pcmk__str_eq(type, "ping", pcmk__str_casei)) {
 442             pcmk__config_warn("Node %s has unrecognized type '%s', "
 443                               "assuming 'ping'", pcmk__s(uname, "without name"),
 444                               type);
 445         }
 446         pe_warn_once(pe_wo_ping_node,
 447                      "Support for nodes of type 'ping' (such as %s) is "
 448                      "deprecated and will be removed in a future release",
 449                      pcmk__s(uname, "unnamed node"));
 450         new_node->details->type = node_ping;
 451     }
 452 
 453     new_node->details->attrs = pcmk__strkey_table(free, free);
 454 
 455     if (pe__is_guest_or_remote_node(new_node)) {
 456         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 457                             strdup("remote"));
 458     } else {
 459         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 460                             strdup("cluster"));
 461     }
 462 
 463     new_node->details->utilization = pcmk__strkey_table(free, free);
 464     new_node->details->digest_cache = pcmk__strkey_table(free,
 465                                                           pe__free_digests);
 466 
 467     data_set->nodes = g_list_insert_sorted(data_set->nodes, new_node,
 468                                            pe__cmp_node_name);
 469     return new_node;
 470 }
 471 
 472 static const char *
 473 expand_remote_rsc_meta(xmlNode *xml_obj, xmlNode *parent, pe_working_set_t *data)
     /* [previous][next][first][last][top][bottom][index][help] */
 474 {
 475     xmlNode *attr_set = NULL;
 476     xmlNode *attr = NULL;
 477 
 478     const char *container_id = ID(xml_obj);
 479     const char *remote_name = NULL;
 480     const char *remote_server = NULL;
 481     const char *remote_port = NULL;
 482     const char *connect_timeout = "60s";
 483     const char *remote_allow_migrate=NULL;
 484     const char *is_managed = NULL;
 485 
 486     for (attr_set = pcmk__xe_first_child(xml_obj); attr_set != NULL;
 487          attr_set = pcmk__xe_next(attr_set)) {
 488 
 489         if (!pcmk__str_eq((const char *)attr_set->name, XML_TAG_META_SETS,
 490                           pcmk__str_casei)) {
 491             continue;
 492         }
 493 
 494         for (attr = pcmk__xe_first_child(attr_set); attr != NULL;
 495              attr = pcmk__xe_next(attr)) {
 496             const char *value = crm_element_value(attr, XML_NVPAIR_ATTR_VALUE);
 497             const char *name = crm_element_value(attr, XML_NVPAIR_ATTR_NAME);
 498 
 499             if (pcmk__str_eq(name, XML_RSC_ATTR_REMOTE_NODE, pcmk__str_casei)) {
 500                 remote_name = value;
 501             } else if (pcmk__str_eq(name, "remote-addr", pcmk__str_casei)) {
 502                 remote_server = value;
 503             } else if (pcmk__str_eq(name, "remote-port", pcmk__str_casei)) {
 504                 remote_port = value;
 505             } else if (pcmk__str_eq(name, "remote-connect-timeout", pcmk__str_casei)) {
 506                 connect_timeout = value;
 507             } else if (pcmk__str_eq(name, "remote-allow-migrate", pcmk__str_casei)) {
 508                 remote_allow_migrate=value;
 509             } else if (pcmk__str_eq(name, XML_RSC_ATTR_MANAGED, pcmk__str_casei)) {
 510                 is_managed = value;
 511             }
 512         }
 513     }
 514 
 515     if (remote_name == NULL) {
 516         return NULL;
 517     }
 518 
 519     if (pe_find_resource(data->resources, remote_name) != NULL) {
 520         return NULL;
 521     }
 522 
 523     pe_create_remote_xml(parent, remote_name, container_id,
 524                          remote_allow_migrate, is_managed,
 525                          connect_timeout, remote_server, remote_port);
 526     return remote_name;
 527 }
 528 
 529 static void
 530 handle_startup_fencing(pe_working_set_t *data_set, pe_node_t *new_node)
     /* [previous][next][first][last][top][bottom][index][help] */
 531 {
 532     if ((new_node->details->type == node_remote) && (new_node->details->remote_rsc == NULL)) {
 533         /* Ignore fencing for remote nodes that don't have a connection resource
 534          * associated with them. This happens when remote node entries get left
 535          * in the nodes section after the connection resource is removed.
 536          */
 537         return;
 538     }
 539 
 540     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 541         // All nodes are unclean until we've seen their status entry
 542         new_node->details->unclean = TRUE;
 543 
 544     } else {
 545         // Blind faith ...
 546         new_node->details->unclean = FALSE;
 547     }
 548 
 549     /* We need to be able to determine if a node's status section
 550      * exists or not separate from whether the node is unclean. */
 551     new_node->details->unseen = TRUE;
 552 }
 553 
 554 gboolean
 555 unpack_nodes(xmlNode * xml_nodes, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 556 {
 557     xmlNode *xml_obj = NULL;
 558     pe_node_t *new_node = NULL;
 559     const char *id = NULL;
 560     const char *uname = NULL;
 561     const char *type = NULL;
 562     const char *score = NULL;
 563 
 564     for (xml_obj = pcmk__xe_first_child(xml_nodes); xml_obj != NULL;
 565          xml_obj = pcmk__xe_next(xml_obj)) {
 566 
 567         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_NODE, pcmk__str_none)) {
 568             new_node = NULL;
 569 
 570             id = crm_element_value(xml_obj, XML_ATTR_ID);
 571             uname = crm_element_value(xml_obj, XML_ATTR_UNAME);
 572             type = crm_element_value(xml_obj, XML_ATTR_TYPE);
 573             score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
 574             crm_trace("Processing node %s/%s", uname, id);
 575 
 576             if (id == NULL) {
 577                 pcmk__config_err("Ignoring <" XML_CIB_TAG_NODE
 578                                  "> entry in configuration without id");
 579                 continue;
 580             }
 581             new_node = pe_create_node(id, uname, type, score, data_set);
 582 
 583             if (new_node == NULL) {
 584                 return FALSE;
 585             }
 586 
 587             handle_startup_fencing(data_set, new_node);
 588 
 589             add_node_attrs(xml_obj, new_node, FALSE, data_set);
 590 
 591             crm_trace("Done with node %s", crm_element_value(xml_obj, XML_ATTR_UNAME));
 592         }
 593     }
 594 
 595     if (data_set->localhost && pe_find_node(data_set->nodes, data_set->localhost) == NULL) {
 596         crm_info("Creating a fake local node");
 597         pe_create_node(data_set->localhost, data_set->localhost, NULL, 0,
 598                        data_set);
 599     }
 600 
 601     return TRUE;
 602 }
 603 
 604 static void
 605 setup_container(pe_resource_t * rsc, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 606 {
 607     const char *container_id = NULL;
 608 
 609     if (rsc->children) {
 610         g_list_foreach(rsc->children, (GFunc) setup_container, data_set);
 611         return;
 612     }
 613 
 614     container_id = g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_CONTAINER);
 615     if (container_id && !pcmk__str_eq(container_id, rsc->id, pcmk__str_casei)) {
 616         pe_resource_t *container = pe_find_resource(data_set->resources, container_id);
 617 
 618         if (container) {
 619             rsc->container = container;
 620             pe__set_resource_flags(container, pe_rsc_is_container);
 621             container->fillers = g_list_append(container->fillers, rsc);
 622             pe_rsc_trace(rsc, "Resource %s's container is %s", rsc->id, container_id);
 623         } else {
 624             pe_err("Resource %s: Unknown resource container (%s)", rsc->id, container_id);
 625         }
 626     }
 627 }
 628 
 629 gboolean
 630 unpack_remote_nodes(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 631 {
 632     xmlNode *xml_obj = NULL;
 633 
 634     /* Create remote nodes and guest nodes from the resource configuration
 635      * before unpacking resources.
 636      */
 637     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 638          xml_obj = pcmk__xe_next(xml_obj)) {
 639 
 640         const char *new_node_id = NULL;
 641 
 642         /* Check for remote nodes, which are defined by ocf:pacemaker:remote
 643          * primitives.
 644          */
 645         if (xml_contains_remote_node(xml_obj)) {
 646             new_node_id = ID(xml_obj);
 647             /* The "pe_find_node" check is here to make sure we don't iterate over
 648              * an expanded node that has already been added to the node list. */
 649             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 650                 crm_trace("Found remote node %s defined by resource %s",
 651                           new_node_id, ID(xml_obj));
 652                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 653                                data_set);
 654             }
 655             continue;
 656         }
 657 
 658         /* Check for guest nodes, which are defined by special meta-attributes
 659          * of a primitive of any type (for example, VirtualDomain or Xen).
 660          */
 661         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_RESOURCE, pcmk__str_none)) {
 662             /* This will add an ocf:pacemaker:remote primitive to the
 663              * configuration for the guest node's connection, to be unpacked
 664              * later.
 665              */
 666             new_node_id = expand_remote_rsc_meta(xml_obj, xml_resources, data_set);
 667             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 668                 crm_trace("Found guest node %s in resource %s",
 669                           new_node_id, ID(xml_obj));
 670                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 671                                data_set);
 672             }
 673             continue;
 674         }
 675 
 676         /* Check for guest nodes inside a group. Clones are currently not
 677          * supported as guest nodes.
 678          */
 679         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_GROUP, pcmk__str_none)) {
 680             xmlNode *xml_obj2 = NULL;
 681             for (xml_obj2 = pcmk__xe_first_child(xml_obj); xml_obj2 != NULL;
 682                  xml_obj2 = pcmk__xe_next(xml_obj2)) {
 683 
 684                 new_node_id = expand_remote_rsc_meta(xml_obj2, xml_resources, data_set);
 685 
 686                 if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 687                     crm_trace("Found guest node %s in resource %s inside group %s",
 688                               new_node_id, ID(xml_obj2), ID(xml_obj));
 689                     pe_create_node(new_node_id, new_node_id, "remote", NULL,
 690                                    data_set);
 691                 }
 692             }
 693         }
 694     }
 695     return TRUE;
 696 }
 697 
 698 /* Call this after all the nodes and resources have been
 699  * unpacked, but before the status section is read.
 700  *
 701  * A remote node's online status is reflected by the state
 702  * of the remote node's connection resource. We need to link
 703  * the remote node to this connection resource so we can have
 704  * easy access to the connection resource during the scheduler calculations.
 705  */
 706 static void
 707 link_rsc2remotenode(pe_working_set_t *data_set, pe_resource_t *new_rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
 708 {
 709     pe_node_t *remote_node = NULL;
 710 
 711     if (new_rsc->is_remote_node == FALSE) {
 712         return;
 713     }
 714 
 715     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 716         /* remote_nodes and remote_resources are not linked in quick location calculations */
 717         return;
 718     }
 719 
 720     remote_node = pe_find_node(data_set->nodes, new_rsc->id);
 721     CRM_CHECK(remote_node != NULL, return);
 722 
 723     pe_rsc_trace(new_rsc, "Linking remote connection resource %s to %s",
 724                  new_rsc->id, pe__node_name(remote_node));
 725     remote_node->details->remote_rsc = new_rsc;
 726 
 727     if (new_rsc->container == NULL) {
 728         /* Handle start-up fencing for remote nodes (as opposed to guest nodes)
 729          * the same as is done for cluster nodes.
 730          */
 731         handle_startup_fencing(data_set, remote_node);
 732 
 733     } else {
 734         /* pe_create_node() marks the new node as "remote" or "cluster"; now
 735          * that we know the node is a guest node, update it correctly.
 736          */
 737         g_hash_table_replace(remote_node->details->attrs, strdup(CRM_ATTR_KIND),
 738                              strdup("container"));
 739     }
 740 }
 741 
 742 static void
 743 destroy_tag(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 744 {
 745     pe_tag_t *tag = data;
 746 
 747     if (tag) {
 748         free(tag->id);
 749         g_list_free_full(tag->refs, free);
 750         free(tag);
 751     }
 752 }
 753 
 754 /*!
 755  * \internal
 756  * \brief Parse configuration XML for resource information
 757  *
 758  * \param[in]     xml_resources  Top of resource configuration XML
 759  * \param[in,out] data_set       Where to put resource information
 760  *
 761  * \return TRUE
 762  *
 763  * \note unpack_remote_nodes() MUST be called before this, so that the nodes can
 764  *       be used when pe__unpack_resource() calls resource_location()
 765  */
 766 gboolean
 767 unpack_resources(const xmlNode *xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 768 {
 769     xmlNode *xml_obj = NULL;
 770     GList *gIter = NULL;
 771 
 772     data_set->template_rsc_sets = pcmk__strkey_table(free, destroy_tag);
 773 
 774     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 775          xml_obj = pcmk__xe_next(xml_obj)) {
 776 
 777         pe_resource_t *new_rsc = NULL;
 778         const char *id = ID(xml_obj);
 779 
 780         if (pcmk__str_empty(id)) {
 781             pcmk__config_err("Ignoring <%s> resource without ID",
 782                              crm_element_name(xml_obj));
 783             continue;
 784         }
 785 
 786         if (pcmk__str_eq((const char *) xml_obj->name, XML_CIB_TAG_RSC_TEMPLATE,
 787                          pcmk__str_none)) {
 788             if (g_hash_table_lookup_extended(data_set->template_rsc_sets, id,
 789                                              NULL, NULL) == FALSE) {
 790                 /* Record the template's ID for the knowledge of its existence anyway. */
 791                 g_hash_table_insert(data_set->template_rsc_sets, strdup(id), NULL);
 792             }
 793             continue;
 794         }
 795 
 796         crm_trace("Unpacking <%s " XML_ATTR_ID "='%s'>",
 797                   crm_element_name(xml_obj), id);
 798         if (pe__unpack_resource(xml_obj, &new_rsc, NULL,
 799                                 data_set) == pcmk_rc_ok) {
 800             data_set->resources = g_list_append(data_set->resources, new_rsc);
 801             pe_rsc_trace(new_rsc, "Added resource %s", new_rsc->id);
 802 
 803         } else {
 804             pcmk__config_err("Ignoring <%s> resource '%s' "
 805                              "because configuration is invalid",
 806                              crm_element_name(xml_obj), id);
 807         }
 808     }
 809 
 810     for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) {
 811         pe_resource_t *rsc = (pe_resource_t *) gIter->data;
 812 
 813         setup_container(rsc, data_set);
 814         link_rsc2remotenode(data_set, rsc);
 815     }
 816 
 817     data_set->resources = g_list_sort(data_set->resources,
 818                                       pe__cmp_rsc_priority);
 819     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 820         /* Ignore */
 821 
 822     } else if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
 823                && !pcmk_is_set(data_set->flags, pe_flag_have_stonith_resource)) {
 824 
 825         pcmk__config_err("Resource start-up disabled since no STONITH resources have been defined");
 826         pcmk__config_err("Either configure some or disable STONITH with the stonith-enabled option");
 827         pcmk__config_err("NOTE: Clusters with shared data need STONITH to ensure data integrity");
 828     }
 829 
 830     return TRUE;
 831 }
 832 
 833 gboolean
 834 unpack_tags(xmlNode * xml_tags, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 835 {
 836     xmlNode *xml_tag = NULL;
 837 
 838     data_set->tags = pcmk__strkey_table(free, destroy_tag);
 839 
 840     for (xml_tag = pcmk__xe_first_child(xml_tags); xml_tag != NULL;
 841          xml_tag = pcmk__xe_next(xml_tag)) {
 842 
 843         xmlNode *xml_obj_ref = NULL;
 844         const char *tag_id = ID(xml_tag);
 845 
 846         if (!pcmk__str_eq((const char *)xml_tag->name, XML_CIB_TAG_TAG, pcmk__str_none)) {
 847             continue;
 848         }
 849 
 850         if (tag_id == NULL) {
 851             pcmk__config_err("Ignoring <%s> without " XML_ATTR_ID,
 852                              crm_element_name(xml_tag));
 853             continue;
 854         }
 855 
 856         for (xml_obj_ref = pcmk__xe_first_child(xml_tag); xml_obj_ref != NULL;
 857              xml_obj_ref = pcmk__xe_next(xml_obj_ref)) {
 858 
 859             const char *obj_ref = ID(xml_obj_ref);
 860 
 861             if (!pcmk__str_eq((const char *)xml_obj_ref->name, XML_CIB_TAG_OBJ_REF, pcmk__str_none)) {
 862                 continue;
 863             }
 864 
 865             if (obj_ref == NULL) {
 866                 pcmk__config_err("Ignoring <%s> for tag '%s' without " XML_ATTR_ID,
 867                                  crm_element_name(xml_obj_ref), tag_id);
 868                 continue;
 869             }
 870 
 871             if (add_tag_ref(data_set->tags, tag_id, obj_ref) == FALSE) {
 872                 return FALSE;
 873             }
 874         }
 875     }
 876 
 877     return TRUE;
 878 }
 879 
 880 /* The ticket state section:
 881  * "/cib/status/tickets/ticket_state" */
 882 static gboolean
 883 unpack_ticket_state(xmlNode * xml_ticket, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 884 {
 885     const char *ticket_id = NULL;
 886     const char *granted = NULL;
 887     const char *last_granted = NULL;
 888     const char *standby = NULL;
 889     xmlAttrPtr xIter = NULL;
 890 
 891     pe_ticket_t *ticket = NULL;
 892 
 893     ticket_id = ID(xml_ticket);
 894     if (pcmk__str_empty(ticket_id)) {
 895         return FALSE;
 896     }
 897 
 898     crm_trace("Processing ticket state for %s", ticket_id);
 899 
 900     ticket = g_hash_table_lookup(data_set->tickets, ticket_id);
 901     if (ticket == NULL) {
 902         ticket = ticket_new(ticket_id, data_set);
 903         if (ticket == NULL) {
 904             return FALSE;
 905         }
 906     }
 907 
 908     for (xIter = xml_ticket->properties; xIter; xIter = xIter->next) {
 909         const char *prop_name = (const char *)xIter->name;
 910         const char *prop_value = crm_element_value(xml_ticket, prop_name);
 911 
 912         if (pcmk__str_eq(prop_name, XML_ATTR_ID, pcmk__str_none)) {
 913             continue;
 914         }
 915         g_hash_table_replace(ticket->state, strdup(prop_name), strdup(prop_value));
 916     }
 917 
 918     granted = g_hash_table_lookup(ticket->state, "granted");
 919     if (granted && crm_is_true(granted)) {
 920         ticket->granted = TRUE;
 921         crm_info("We have ticket '%s'", ticket->id);
 922     } else {
 923         ticket->granted = FALSE;
 924         crm_info("We do not have ticket '%s'", ticket->id);
 925     }
 926 
 927     last_granted = g_hash_table_lookup(ticket->state, "last-granted");
 928     if (last_granted) {
 929         long long last_granted_ll;
 930 
 931         pcmk__scan_ll(last_granted, &last_granted_ll, 0LL);
 932         ticket->last_granted = (time_t) last_granted_ll;
 933     }
 934 
 935     standby = g_hash_table_lookup(ticket->state, "standby");
 936     if (standby && crm_is_true(standby)) {
 937         ticket->standby = TRUE;
 938         if (ticket->granted) {
 939             crm_info("Granted ticket '%s' is in standby-mode", ticket->id);
 940         }
 941     } else {
 942         ticket->standby = FALSE;
 943     }
 944 
 945     crm_trace("Done with ticket state for %s", ticket_id);
 946 
 947     return TRUE;
 948 }
 949 
 950 static gboolean
 951 unpack_tickets_state(xmlNode * xml_tickets, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 952 {
 953     xmlNode *xml_obj = NULL;
 954 
 955     for (xml_obj = pcmk__xe_first_child(xml_tickets); xml_obj != NULL;
 956          xml_obj = pcmk__xe_next(xml_obj)) {
 957 
 958         if (!pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_TICKET_STATE, pcmk__str_none)) {
 959             continue;
 960         }
 961         unpack_ticket_state(xml_obj, data_set);
 962     }
 963 
 964     return TRUE;
 965 }
 966 
 967 static void
 968 unpack_handle_remote_attrs(pe_node_t *this_node, const xmlNode *state,
     /* [previous][next][first][last][top][bottom][index][help] */
 969                            pe_working_set_t *data_set)
 970 {
 971     const char *resource_discovery_enabled = NULL;
 972     const xmlNode *attrs = NULL;
 973     pe_resource_t *rsc = NULL;
 974 
 975     if (!pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
 976         return;
 977     }
 978 
 979     if ((this_node == NULL) || !pe__is_guest_or_remote_node(this_node)) {
 980         return;
 981     }
 982     crm_trace("Processing Pacemaker Remote node %s", pe__node_name(this_node));
 983 
 984     pcmk__scan_min_int(crm_element_value(state, XML_NODE_IS_MAINTENANCE),
 985                        &(this_node->details->remote_maintenance), 0);
 986 
 987     rsc = this_node->details->remote_rsc;
 988     if (this_node->details->remote_requires_reset == FALSE) {
 989         this_node->details->unclean = FALSE;
 990         this_node->details->unseen = FALSE;
 991     }
 992     attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
 993     add_node_attrs(attrs, this_node, TRUE, data_set);
 994 
 995     if (pe__shutdown_requested(this_node)) {
 996         crm_info("%s is shutting down", pe__node_name(this_node));
 997         this_node->details->shutdown = TRUE;
 998     }
 999  
1000     if (crm_is_true(pe_node_attribute_raw(this_node, "standby"))) {
1001         crm_info("%s is in standby mode", pe__node_name(this_node));
1002         this_node->details->standby = TRUE;
1003     }
1004 
1005     if (crm_is_true(pe_node_attribute_raw(this_node, "maintenance")) ||
1006         ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed))) {
1007         crm_info("%s is in maintenance mode", pe__node_name(this_node));
1008         this_node->details->maintenance = TRUE;
1009     }
1010 
1011     resource_discovery_enabled = pe_node_attribute_raw(this_node, XML_NODE_ATTR_RSC_DISCOVERY);
1012     if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
1013         if (pe__is_remote_node(this_node)
1014             && !pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1015             crm_warn("Ignoring " XML_NODE_ATTR_RSC_DISCOVERY
1016                      " attribute on Pacemaker Remote node %s"
1017                      " because fencing is disabled",
1018                      pe__node_name(this_node));
1019         } else {
1020             /* This is either a remote node with fencing enabled, or a guest
1021              * node. We don't care whether fencing is enabled when fencing guest
1022              * nodes, because they are "fenced" by recovering their containing
1023              * resource.
1024              */
1025             crm_info("%s has resource discovery disabled",
1026                      pe__node_name(this_node));
1027             this_node->details->rsc_discovery_enabled = FALSE;
1028         }
1029     }
1030 }
1031 
1032 /*!
1033  * \internal
1034  * \brief Unpack a cluster node's transient attributes
1035  *
1036  * \param[in]     state     CIB node state XML
1037  * \param[in,out] node      Cluster node whose attributes are being unpacked
1038  * \param[in,out] data_set  Cluster working set
1039  */
1040 static void
1041 unpack_transient_attributes(const xmlNode *state, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1042                             pe_working_set_t *data_set)
1043 {
1044     const char *discovery = NULL;
1045     const xmlNode *attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS,
1046                                          FALSE);
1047 
1048     add_node_attrs(attrs, node, TRUE, data_set);
1049 
1050     if (crm_is_true(pe_node_attribute_raw(node, "standby"))) {
1051         crm_info("%s is in standby mode", pe__node_name(node));
1052         node->details->standby = TRUE;
1053     }
1054 
1055     if (crm_is_true(pe_node_attribute_raw(node, "maintenance"))) {
1056         crm_info("%s is in maintenance mode", pe__node_name(node));
1057         node->details->maintenance = TRUE;
1058     }
1059 
1060     discovery = pe_node_attribute_raw(node, XML_NODE_ATTR_RSC_DISCOVERY);
1061     if ((discovery != NULL) && !crm_is_true(discovery)) {
1062         crm_warn("Ignoring " XML_NODE_ATTR_RSC_DISCOVERY
1063                  " attribute for %s because disabling resource discovery "
1064                  "is not allowed for cluster nodes", pe__node_name(node));
1065     }
1066 }
1067 
1068 /*!
1069  * \internal
1070  * \brief Unpack a node state entry (first pass)
1071  *
1072  * Unpack one node state entry from status. This unpacks information from the
1073  * node_state element itself and node attributes inside it, but not the
1074  * resource history inside it. Multiple passes through the status are needed to
1075  * fully unpack everything.
1076  *
1077  * \param[in]     state     CIB node state XML
1078  * \param[in,out] data_set  Cluster working set
1079  */
1080 static void
1081 unpack_node_state(const xmlNode *state, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1082 {
1083     const char *id = NULL;
1084     const char *uname = NULL;
1085     pe_node_t *this_node = NULL;
1086 
1087     id = crm_element_value(state, XML_ATTR_ID);
1088     if (id == NULL) {
1089         crm_warn("Ignoring malformed " XML_CIB_TAG_STATE " entry without "
1090                  XML_ATTR_ID);
1091         return;
1092     }
1093 
1094     uname = crm_element_value(state, XML_ATTR_UNAME);
1095     if (uname == NULL) {
1096         crm_warn("Ignoring malformed " XML_CIB_TAG_STATE " entry without "
1097                  XML_ATTR_UNAME);
1098         return;
1099     }
1100 
1101     this_node = pe_find_node_any(data_set->nodes, id, uname);
1102     if (this_node == NULL) {
1103         pcmk__config_warn("Ignoring recorded node state for '%s' because "
1104                           "it is no longer in the configuration", uname);
1105         return;
1106     }
1107 
1108     if (pe__is_guest_or_remote_node(this_node)) {
1109         /* We can't determine the online status of Pacemaker Remote nodes until
1110          * after all resource history has been unpacked. In this first pass, we
1111          * do need to mark whether the node has been fenced, as this plays a
1112          * role during unpacking cluster node resource state.
1113          */
1114         pcmk__scan_min_int(crm_element_value(state, XML_NODE_IS_FENCED),
1115                            &(this_node->details->remote_was_fenced), 0);
1116         return;
1117     }
1118 
1119     unpack_transient_attributes(state, this_node, data_set);
1120 
1121     /* Provisionally mark this cluster node as clean. We have at least seen it
1122      * in the current cluster's lifetime.
1123      */
1124     this_node->details->unclean = FALSE;
1125     this_node->details->unseen = FALSE;
1126 
1127     crm_trace("Determining online status of cluster node %s (id %s)",
1128               pe__node_name(this_node), id);
1129     determine_online_status(state, this_node, data_set);
1130 
1131     if (!pcmk_is_set(data_set->flags, pe_flag_have_quorum)
1132         && this_node->details->online
1133         && (data_set->no_quorum_policy == no_quorum_suicide)) {
1134         /* Everything else should flow from this automatically
1135          * (at least until the scheduler becomes able to migrate off
1136          * healthy resources)
1137          */
1138         pe_fence_node(data_set, this_node, "cluster does not have quorum",
1139                       FALSE);
1140     }
1141 }
1142 
1143 /*!
1144  * \internal
1145  * \brief Unpack nodes' resource history as much as possible
1146  *
1147  * Unpack as many nodes' resource history as possible in one pass through the
1148  * status. We need to process Pacemaker Remote nodes' connections/containers
1149  * before unpacking their history; the connection/container history will be
1150  * in another node's history, so it might take multiple passes to unpack
1151  * everything.
1152  *
1153  * \param[in]     status    CIB XML status section
1154  * \param[in]     fence     If true, treat any not-yet-unpacked nodes as unseen
1155  * \param[in,out] data_set  Cluster working set
1156  *
1157  * \return Standard Pacemaker return code (specifically pcmk_rc_ok if done,
1158  *         or EAGAIN if more unpacking remains to be done)
1159  */
1160 static int
1161 unpack_node_history(const xmlNode *status, bool fence,
     /* [previous][next][first][last][top][bottom][index][help] */
1162                     pe_working_set_t *data_set)
1163 {
1164     int rc = pcmk_rc_ok;
1165 
1166     // Loop through all node_state entries in CIB status
1167     for (const xmlNode *state = first_named_child(status, XML_CIB_TAG_STATE);
1168          state != NULL; state = crm_next_same_xml(state)) {
1169 
1170         const char *id = ID(state);
1171         const char *uname = crm_element_value(state, XML_ATTR_UNAME);
1172         pe_node_t *this_node = NULL;
1173 
1174         if ((id == NULL) || (uname == NULL)) {
1175             // Warning already logged in first pass through status section
1176             crm_trace("Not unpacking resource history from malformed "
1177                       XML_CIB_TAG_STATE " without id and/or uname");
1178             continue;
1179         }
1180 
1181         this_node = pe_find_node_any(data_set->nodes, id, uname);
1182         if (this_node == NULL) {
1183             // Warning already logged in first pass through status section
1184             crm_trace("Not unpacking resource history for node %s because "
1185                       "no longer in configuration", id);
1186             continue;
1187         }
1188 
1189         if (this_node->details->unpacked) {
1190             crm_trace("Not unpacking resource history for node %s because "
1191                       "already unpacked", id);
1192             continue;
1193         }
1194 
1195         if (fence) {
1196             // We're processing all remaining nodes
1197 
1198         } else if (pe__is_guest_node(this_node)) {
1199             /* We can unpack a guest node's history only after we've unpacked
1200              * other resource history to the point that we know that the node's
1201              * connection and containing resource are both up.
1202              */
1203             pe_resource_t *rsc = this_node->details->remote_rsc;
1204 
1205             if ((rsc == NULL) || (rsc->role != RSC_ROLE_STARTED)
1206                 || (rsc->container->role != RSC_ROLE_STARTED)) {
1207                 crm_trace("Not unpacking resource history for guest node %s "
1208                           "because container and connection are not known to "
1209                           "be up", id);
1210                 continue;
1211             }
1212 
1213         } else if (pe__is_remote_node(this_node)) {
1214             /* We can unpack a remote node's history only after we've unpacked
1215              * other resource history to the point that we know that the node's
1216              * connection is up, with the exception of when shutdown locks are
1217              * in use.
1218              */
1219             pe_resource_t *rsc = this_node->details->remote_rsc;
1220 
1221             if ((rsc == NULL)
1222                 || (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)
1223                     && (rsc->role != RSC_ROLE_STARTED))) {
1224                 crm_trace("Not unpacking resource history for remote node %s "
1225                           "because connection is not known to be up", id);
1226                 continue;
1227             }
1228 
1229         /* If fencing and shutdown locks are disabled and we're not processing
1230          * unseen nodes, then we don't want to unpack offline nodes until online
1231          * nodes have been unpacked. This allows us to number active clone
1232          * instances first.
1233          */
1234         } else if (!pcmk_any_flags_set(data_set->flags, pe_flag_stonith_enabled
1235                                                         |pe_flag_shutdown_lock)
1236                    && !this_node->details->online) {
1237             crm_trace("Not unpacking resource history for offline "
1238                       "cluster node %s", id);
1239             continue;
1240         }
1241 
1242         if (pe__is_guest_or_remote_node(this_node)) {
1243             determine_remote_online_status(data_set, this_node);
1244             unpack_handle_remote_attrs(this_node, state, data_set);
1245         }
1246 
1247         crm_trace("Unpacking resource history for %snode %s",
1248                   (fence? "unseen " : ""), id);
1249 
1250         this_node->details->unpacked = TRUE;
1251         unpack_node_lrm(this_node, state, data_set);
1252 
1253         rc = EAGAIN; // Other node histories might depend on this one
1254     }
1255     return rc;
1256 }
1257 
1258 /* remove nodes that are down, stopping */
1259 /* create positive rsc_to_node constraints between resources and the nodes they are running on */
1260 /* anything else? */
1261 gboolean
1262 unpack_status(xmlNode * status, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1263 {
1264     xmlNode *state = NULL;
1265 
1266     crm_trace("Beginning unpack");
1267 
1268     if (data_set->tickets == NULL) {
1269         data_set->tickets = pcmk__strkey_table(free, destroy_ticket);
1270     }
1271 
1272     for (state = pcmk__xe_first_child(status); state != NULL;
1273          state = pcmk__xe_next(state)) {
1274 
1275         if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_TICKETS, pcmk__str_none)) {
1276             unpack_tickets_state((xmlNode *) state, data_set);
1277 
1278         } else if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
1279             unpack_node_state(state, data_set);
1280         }
1281     }
1282 
1283     while (unpack_node_history(status, FALSE, data_set) == EAGAIN) {
1284         crm_trace("Another pass through node resource histories is needed");
1285     }
1286 
1287     // Now catch any nodes we didn't see
1288     unpack_node_history(status,
1289                         pcmk_is_set(data_set->flags, pe_flag_stonith_enabled),
1290                         data_set);
1291 
1292     /* Now that we know where resources are, we can schedule stops of containers
1293      * with failed bundle connections
1294      */
1295     if (data_set->stop_needed != NULL) {
1296         for (GList *item = data_set->stop_needed; item; item = item->next) {
1297             pe_resource_t *container = item->data;
1298             pe_node_t *node = pe__current_node(container);
1299 
1300             if (node) {
1301                 stop_action(container, node, FALSE);
1302             }
1303         }
1304         g_list_free(data_set->stop_needed);
1305         data_set->stop_needed = NULL;
1306     }
1307 
1308     /* Now that we know status of all Pacemaker Remote connections and nodes,
1309      * we can stop connections for node shutdowns, and check the online status
1310      * of remote/guest nodes that didn't have any node history to unpack.
1311      */
1312     for (GList *gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) {
1313         pe_node_t *this_node = gIter->data;
1314 
1315         if (!pe__is_guest_or_remote_node(this_node)) {
1316             continue;
1317         }
1318         if (this_node->details->shutdown
1319             && (this_node->details->remote_rsc != NULL)) {
1320             pe__set_next_role(this_node->details->remote_rsc, RSC_ROLE_STOPPED,
1321                               "remote shutdown");
1322         }
1323         if (!this_node->details->unpacked) {
1324             determine_remote_online_status(data_set, this_node);
1325         }
1326     }
1327 
1328     return TRUE;
1329 }
1330 
1331 static gboolean
1332 determine_online_status_no_fencing(pe_working_set_t *data_set,
     /* [previous][next][first][last][top][bottom][index][help] */
1333                                    const xmlNode *node_state,
1334                                    pe_node_t *this_node)
1335 {
1336     gboolean online = FALSE;
1337     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1338     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1339     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1340     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1341 
1342     if (!crm_is_true(in_cluster)) {
1343         crm_trace("Node is down: in_cluster=%s",
1344                   pcmk__s(in_cluster, "<null>"));
1345 
1346     } else if (pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei)) {
1347         if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1348             online = TRUE;
1349         } else {
1350             crm_debug("Node is not ready to run resources: %s", join);
1351         }
1352 
1353     } else if (this_node->details->expected_up == FALSE) {
1354         crm_trace("Controller is down: "
1355                   "in_cluster=%s is_peer=%s join=%s expected=%s",
1356                   pcmk__s(in_cluster, "<null>"), pcmk__s(is_peer, "<null>"),
1357                   pcmk__s(join, "<null>"), pcmk__s(exp_state, "<null>"));
1358 
1359     } else {
1360         /* mark it unclean */
1361         pe_fence_node(data_set, this_node, "peer is unexpectedly down", FALSE);
1362         crm_info("in_cluster=%s is_peer=%s join=%s expected=%s",
1363                  pcmk__s(in_cluster, "<null>"), pcmk__s(is_peer, "<null>"),
1364                  pcmk__s(join, "<null>"), pcmk__s(exp_state, "<null>"));
1365     }
1366     return online;
1367 }
1368 
1369 static gboolean
1370 determine_online_status_fencing(pe_working_set_t *data_set,
     /* [previous][next][first][last][top][bottom][index][help] */
1371                                 const xmlNode *node_state, pe_node_t *this_node)
1372 {
1373     gboolean online = FALSE;
1374     gboolean do_terminate = FALSE;
1375     bool crmd_online = FALSE;
1376     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1377     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1378     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1379     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1380     const char *terminate = pe_node_attribute_raw(this_node, "terminate");
1381 
1382 /*
1383   - XML_NODE_IN_CLUSTER    ::= true|false
1384   - XML_NODE_IS_PEER       ::= online|offline
1385   - XML_NODE_JOIN_STATE    ::= member|down|pending|banned
1386   - XML_NODE_EXPECTED      ::= member|down
1387 */
1388 
1389     if (crm_is_true(terminate)) {
1390         do_terminate = TRUE;
1391 
1392     } else if (terminate != NULL && strlen(terminate) > 0) {
1393         /* could be a time() value */
1394         char t = terminate[0];
1395 
1396         if (t != '0' && isdigit(t)) {
1397             do_terminate = TRUE;
1398         }
1399     }
1400 
1401     crm_trace("%s: in_cluster=%s is_peer=%s join=%s expected=%s term=%d",
1402               pe__node_name(this_node), pcmk__s(in_cluster, "<null>"),
1403               pcmk__s(is_peer, "<null>"), pcmk__s(join, "<null>"),
1404               pcmk__s(exp_state, "<null>"), do_terminate);
1405 
1406     online = crm_is_true(in_cluster);
1407     crmd_online = pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei);
1408     if (exp_state == NULL) {
1409         exp_state = CRMD_JOINSTATE_DOWN;
1410     }
1411 
1412     if (this_node->details->shutdown) {
1413         crm_debug("%s is shutting down", pe__node_name(this_node));
1414 
1415         /* Slightly different criteria since we can't shut down a dead peer */
1416         online = crmd_online;
1417 
1418     } else if (in_cluster == NULL) {
1419         pe_fence_node(data_set, this_node, "peer has not been seen by the cluster", FALSE);
1420 
1421     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_NACK, pcmk__str_casei)) {
1422         pe_fence_node(data_set, this_node,
1423                       "peer failed Pacemaker membership criteria", FALSE);
1424 
1425     } else if (do_terminate == FALSE && pcmk__str_eq(exp_state, CRMD_JOINSTATE_DOWN, pcmk__str_casei)) {
1426 
1427         if (crm_is_true(in_cluster) || crmd_online) {
1428             crm_info("- %s is not ready to run resources",
1429                      pe__node_name(this_node));
1430             this_node->details->standby = TRUE;
1431             this_node->details->pending = TRUE;
1432 
1433         } else {
1434             crm_trace("%s is down or still coming up",
1435                       pe__node_name(this_node));
1436         }
1437 
1438     } else if (do_terminate && pcmk__str_eq(join, CRMD_JOINSTATE_DOWN, pcmk__str_casei)
1439                && crm_is_true(in_cluster) == FALSE && !crmd_online) {
1440         crm_info("%s was just shot", pe__node_name(this_node));
1441         online = FALSE;
1442 
1443     } else if (crm_is_true(in_cluster) == FALSE) {
1444         // Consider `priority-fencing-delay` for lost nodes
1445         pe_fence_node(data_set, this_node, "peer is no longer part of the cluster", TRUE);
1446 
1447     } else if (!crmd_online) {
1448         pe_fence_node(data_set, this_node, "peer process is no longer available", FALSE);
1449 
1450         /* Everything is running at this point, now check join state */
1451     } else if (do_terminate) {
1452         pe_fence_node(data_set, this_node, "termination was requested", FALSE);
1453 
1454     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1455         crm_info("%s is active", pe__node_name(this_node));
1456 
1457     } else if (pcmk__strcase_any_of(join, CRMD_JOINSTATE_PENDING, CRMD_JOINSTATE_DOWN, NULL)) {
1458         crm_info("%s is not ready to run resources", pe__node_name(this_node));
1459         this_node->details->standby = TRUE;
1460         this_node->details->pending = TRUE;
1461 
1462     } else {
1463         pe_fence_node(data_set, this_node, "peer was in an unknown state", FALSE);
1464         crm_warn("%s: in-cluster=%s is-peer=%s join=%s expected=%s term=%d shutdown=%d",
1465                  pe__node_name(this_node), pcmk__s(in_cluster, "<null>"),
1466                  pcmk__s(is_peer, "<null>"), pcmk__s(join, "<null>"),
1467                  pcmk__s(exp_state, "<null>"), do_terminate,
1468                  this_node->details->shutdown);
1469     }
1470 
1471     return online;
1472 }
1473 
1474 static void
1475 determine_remote_online_status(pe_working_set_t * data_set, pe_node_t * this_node)
     /* [previous][next][first][last][top][bottom][index][help] */
1476 {
1477     pe_resource_t *rsc = this_node->details->remote_rsc;
1478     pe_resource_t *container = NULL;
1479     pe_node_t *host = NULL;
1480 
1481     /* If there is a node state entry for a (former) Pacemaker Remote node
1482      * but no resource creating that node, the node's connection resource will
1483      * be NULL. Consider it an offline remote node in that case.
1484      */
1485     if (rsc == NULL) {
1486         this_node->details->online = FALSE;
1487         goto remote_online_done;
1488     }
1489 
1490     container = rsc->container;
1491 
1492     if (container && pcmk__list_of_1(rsc->running_on)) {
1493         host = rsc->running_on->data;
1494     }
1495 
1496     /* If the resource is currently started, mark it online. */
1497     if (rsc->role == RSC_ROLE_STARTED) {
1498         crm_trace("%s node %s presumed ONLINE because connection resource is started",
1499                   (container? "Guest" : "Remote"), this_node->details->id);
1500         this_node->details->online = TRUE;
1501     }
1502 
1503     /* consider this node shutting down if transitioning start->stop */
1504     if (rsc->role == RSC_ROLE_STARTED && rsc->next_role == RSC_ROLE_STOPPED) {
1505         crm_trace("%s node %s shutting down because connection resource is stopping",
1506                   (container? "Guest" : "Remote"), this_node->details->id);
1507         this_node->details->shutdown = TRUE;
1508     }
1509 
1510     /* Now check all the failure conditions. */
1511     if(container && pcmk_is_set(container->flags, pe_rsc_failed)) {
1512         crm_trace("Guest node %s UNCLEAN because guest resource failed",
1513                   this_node->details->id);
1514         this_node->details->online = FALSE;
1515         this_node->details->remote_requires_reset = TRUE;
1516 
1517     } else if (pcmk_is_set(rsc->flags, pe_rsc_failed)) {
1518         crm_trace("%s node %s OFFLINE because connection resource failed",
1519                   (container? "Guest" : "Remote"), this_node->details->id);
1520         this_node->details->online = FALSE;
1521 
1522     } else if (rsc->role == RSC_ROLE_STOPPED
1523         || (container && container->role == RSC_ROLE_STOPPED)) {
1524 
1525         crm_trace("%s node %s OFFLINE because its resource is stopped",
1526                   (container? "Guest" : "Remote"), this_node->details->id);
1527         this_node->details->online = FALSE;
1528         this_node->details->remote_requires_reset = FALSE;
1529 
1530     } else if (host && (host->details->online == FALSE)
1531                && host->details->unclean) {
1532         crm_trace("Guest node %s UNCLEAN because host is unclean",
1533                   this_node->details->id);
1534         this_node->details->online = FALSE;
1535         this_node->details->remote_requires_reset = TRUE;
1536     }
1537 
1538 remote_online_done:
1539     crm_trace("Remote node %s online=%s",
1540         this_node->details->id, this_node->details->online ? "TRUE" : "FALSE");
1541 }
1542 
1543 static void
1544 determine_online_status(const xmlNode *node_state, pe_node_t *this_node,
     /* [previous][next][first][last][top][bottom][index][help] */
1545                         pe_working_set_t *data_set)
1546 {
1547     gboolean online = FALSE;
1548     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1549 
1550     CRM_CHECK(this_node != NULL, return);
1551 
1552     this_node->details->shutdown = FALSE;
1553     this_node->details->expected_up = FALSE;
1554 
1555     if (pe__shutdown_requested(this_node)) {
1556         this_node->details->shutdown = TRUE;
1557 
1558     } else if (pcmk__str_eq(exp_state, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1559         this_node->details->expected_up = TRUE;
1560     }
1561 
1562     if (this_node->details->type == node_ping) {
1563         this_node->details->unclean = FALSE;
1564         online = FALSE;         /* As far as resource management is concerned,
1565                                  * the node is safely offline.
1566                                  * Anyone caught abusing this logic will be shot
1567                                  */
1568 
1569     } else if (!pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1570         online = determine_online_status_no_fencing(data_set, node_state, this_node);
1571 
1572     } else {
1573         online = determine_online_status_fencing(data_set, node_state, this_node);
1574     }
1575 
1576     if (online) {
1577         this_node->details->online = TRUE;
1578 
1579     } else {
1580         /* remove node from contention */
1581         this_node->fixed = TRUE; // @COMPAT deprecated and unused
1582         this_node->weight = -INFINITY;
1583     }
1584 
1585     if (online && this_node->details->shutdown) {
1586         /* don't run resources here */
1587         this_node->fixed = TRUE; // @COMPAT deprecated and unused
1588         this_node->weight = -INFINITY;
1589     }
1590 
1591     if (this_node->details->type == node_ping) {
1592         crm_info("%s is not a Pacemaker node", pe__node_name(this_node));
1593 
1594     } else if (this_node->details->unclean) {
1595         pe_proc_warn("%s is unclean", pe__node_name(this_node));
1596 
1597     } else if (this_node->details->online) {
1598         crm_info("%s is %s", pe__node_name(this_node),
1599                  this_node->details->shutdown ? "shutting down" :
1600                  this_node->details->pending ? "pending" :
1601                  this_node->details->standby ? "standby" :
1602                  this_node->details->maintenance ? "maintenance" : "online");
1603 
1604     } else {
1605         crm_trace("%s is offline", pe__node_name(this_node));
1606     }
1607 }
1608 
1609 /*!
1610  * \internal
1611  * \brief Find the end of a resource's name, excluding any clone suffix
1612  *
1613  * \param[in] id  Resource ID to check
1614  *
1615  * \return Pointer to last character of resource's base name
1616  */
1617 const char *
1618 pe_base_name_end(const char *id)
     /* [previous][next][first][last][top][bottom][index][help] */
1619 {
1620     if (!pcmk__str_empty(id)) {
1621         const char *end = id + strlen(id) - 1;
1622 
1623         for (const char *s = end; s > id; --s) {
1624             switch (*s) {
1625                 case '0':
1626                 case '1':
1627                 case '2':
1628                 case '3':
1629                 case '4':
1630                 case '5':
1631                 case '6':
1632                 case '7':
1633                 case '8':
1634                 case '9':
1635                     break;
1636                 case ':':
1637                     return (s == end)? s : (s - 1);
1638                 default:
1639                     return end;
1640             }
1641         }
1642         return end;
1643     }
1644     return NULL;
1645 }
1646 
1647 /*!
1648  * \internal
1649  * \brief Get a resource name excluding any clone suffix
1650  *
1651  * \param[in] last_rsc_id  Resource ID to check
1652  *
1653  * \return Pointer to newly allocated string with resource's base name
1654  * \note It is the caller's responsibility to free() the result.
1655  *       This asserts on error, so callers can assume result is not NULL.
1656  */
1657 char *
1658 clone_strip(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1659 {
1660     const char *end = pe_base_name_end(last_rsc_id);
1661     char *basename = NULL;
1662 
1663     CRM_ASSERT(end);
1664     basename = strndup(last_rsc_id, end - last_rsc_id + 1);
1665     CRM_ASSERT(basename);
1666     return basename;
1667 }
1668 
1669 /*!
1670  * \internal
1671  * \brief Get the name of the first instance of a cloned resource
1672  *
1673  * \param[in] last_rsc_id  Resource ID to check
1674  *
1675  * \return Pointer to newly allocated string with resource's base name plus :0
1676  * \note It is the caller's responsibility to free() the result.
1677  *       This asserts on error, so callers can assume result is not NULL.
1678  */
1679 char *
1680 clone_zero(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1681 {
1682     const char *end = pe_base_name_end(last_rsc_id);
1683     size_t base_name_len = end - last_rsc_id + 1;
1684     char *zero = NULL;
1685 
1686     CRM_ASSERT(end);
1687     zero = calloc(base_name_len + 3, sizeof(char));
1688     CRM_ASSERT(zero);
1689     memcpy(zero, last_rsc_id, base_name_len);
1690     zero[base_name_len] = ':';
1691     zero[base_name_len + 1] = '0';
1692     return zero;
1693 }
1694 
1695 static pe_resource_t *
1696 create_fake_resource(const char *rsc_id, const xmlNode *rsc_entry,
     /* [previous][next][first][last][top][bottom][index][help] */
1697                      pe_working_set_t *data_set)
1698 {
1699     pe_resource_t *rsc = NULL;
1700     xmlNode *xml_rsc = create_xml_node(NULL, XML_CIB_TAG_RESOURCE);
1701 
1702     copy_in_properties(xml_rsc, rsc_entry);
1703     crm_xml_add(xml_rsc, XML_ATTR_ID, rsc_id);
1704     crm_log_xml_debug(xml_rsc, "Orphan resource");
1705 
1706     if (pe__unpack_resource(xml_rsc, &rsc, NULL, data_set) != pcmk_rc_ok) {
1707         return NULL;
1708     }
1709 
1710     if (xml_contains_remote_node(xml_rsc)) {
1711         pe_node_t *node;
1712 
1713         crm_debug("Detected orphaned remote node %s", rsc_id);
1714         node = pe_find_node(data_set->nodes, rsc_id);
1715         if (node == NULL) {
1716                 node = pe_create_node(rsc_id, rsc_id, "remote", NULL, data_set);
1717         }
1718         link_rsc2remotenode(data_set, rsc);
1719 
1720         if (node) {
1721             crm_trace("Setting node %s as shutting down due to orphaned connection resource", rsc_id);
1722             node->details->shutdown = TRUE;
1723         }
1724     }
1725 
1726     if (crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER)) {
1727         /* This orphaned rsc needs to be mapped to a container. */
1728         crm_trace("Detected orphaned container filler %s", rsc_id);
1729         pe__set_resource_flags(rsc, pe_rsc_orphan_container_filler);
1730     }
1731     pe__set_resource_flags(rsc, pe_rsc_orphan);
1732     data_set->resources = g_list_append(data_set->resources, rsc);
1733     return rsc;
1734 }
1735 
1736 /*!
1737  * \internal
1738  * \brief Create orphan instance for anonymous clone resource history
1739  *
1740  * \param[in,out] parent    Clone resource that orphan will be added to
1741  * \param[in]     rsc_id    Orphan's resource ID
1742  * \param[in]     node      Where orphan is active (for logging only)
1743  * \param[in,out] data_set  Cluster working set
1744  *
1745  * \return Newly added orphaned instance of \p parent
1746  */
1747 static pe_resource_t *
1748 create_anonymous_orphan(pe_resource_t *parent, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1749                         const pe_node_t *node, pe_working_set_t *data_set)
1750 {
1751     pe_resource_t *top = pe__create_clone_child(parent, data_set);
1752 
1753     // find_rsc() because we might be a cloned group
1754     pe_resource_t *orphan = top->fns->find_rsc(top, rsc_id, NULL, pe_find_clone);
1755 
1756     pe_rsc_debug(parent, "Created orphan %s for %s: %s on %s",
1757                  top->id, parent->id, rsc_id, pe__node_name(node));
1758     return orphan;
1759 }
1760 
1761 /*!
1762  * \internal
1763  * \brief Check a node for an instance of an anonymous clone
1764  *
1765  * Return a child instance of the specified anonymous clone, in order of
1766  * preference: (1) the instance running on the specified node, if any;
1767  * (2) an inactive instance (i.e. within the total of clone-max instances);
1768  * (3) a newly created orphan (i.e. clone-max instances are already active).
1769  *
1770  * \param[in,out] data_set  Cluster information
1771  * \param[in]     node      Node on which to check for instance
1772  * \param[in,out] parent    Clone to check
1773  * \param[in]     rsc_id    Name of cloned resource in history (without instance)
1774  */
1775 static pe_resource_t *
1776 find_anonymous_clone(pe_working_set_t *data_set, const pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1777                      pe_resource_t *parent, const char *rsc_id)
1778 {
1779     GList *rIter = NULL;
1780     pe_resource_t *rsc = NULL;
1781     pe_resource_t *inactive_instance = NULL;
1782     gboolean skip_inactive = FALSE;
1783 
1784     CRM_ASSERT(parent != NULL);
1785     CRM_ASSERT(pe_rsc_is_clone(parent));
1786     CRM_ASSERT(!pcmk_is_set(parent->flags, pe_rsc_unique));
1787 
1788     // Check for active (or partially active, for cloned groups) instance
1789     pe_rsc_trace(parent, "Looking for %s on %s in %s",
1790                  rsc_id, pe__node_name(node), parent->id);
1791     for (rIter = parent->children; rsc == NULL && rIter; rIter = rIter->next) {
1792         GList *locations = NULL;
1793         pe_resource_t *child = rIter->data;
1794 
1795         /* Check whether this instance is already known to be active or pending
1796          * anywhere, at this stage of unpacking. Because this function is called
1797          * for a resource before the resource's individual operation history
1798          * entries are unpacked, locations will generally not contain the
1799          * desired node.
1800          *
1801          * However, there are three exceptions:
1802          * (1) when child is a cloned group and we have already unpacked the
1803          *     history of another member of the group on the same node;
1804          * (2) when we've already unpacked the history of another numbered
1805          *     instance on the same node (which can happen if globally-unique
1806          *     was flipped from true to false); and
1807          * (3) when we re-run calculations on the same data set as part of a
1808          *     simulation.
1809          */
1810         child->fns->location(child, &locations, 2);
1811         if (locations) {
1812             /* We should never associate the same numbered anonymous clone
1813              * instance with multiple nodes, and clone instances can't migrate,
1814              * so there must be only one location, regardless of history.
1815              */
1816             CRM_LOG_ASSERT(locations->next == NULL);
1817 
1818             if (((pe_node_t *)locations->data)->details == node->details) {
1819                 /* This child instance is active on the requested node, so check
1820                  * for a corresponding configured resource. We use find_rsc()
1821                  * instead of child because child may be a cloned group, and we
1822                  * need the particular member corresponding to rsc_id.
1823                  *
1824                  * If the history entry is orphaned, rsc will be NULL.
1825                  */
1826                 rsc = parent->fns->find_rsc(child, rsc_id, NULL, pe_find_clone);
1827                 if (rsc) {
1828                     /* If there are multiple instance history entries for an
1829                      * anonymous clone in a single node's history (which can
1830                      * happen if globally-unique is switched from true to
1831                      * false), we want to consider the instances beyond the
1832                      * first as orphans, even if there are inactive instance
1833                      * numbers available.
1834                      */
1835                     if (rsc->running_on) {
1836                         crm_notice("Active (now-)anonymous clone %s has "
1837                                    "multiple (orphan) instance histories on %s",
1838                                    parent->id, pe__node_name(node));
1839                         skip_inactive = TRUE;
1840                         rsc = NULL;
1841                     } else {
1842                         pe_rsc_trace(parent, "Resource %s, active", rsc->id);
1843                     }
1844                 }
1845             }
1846             g_list_free(locations);
1847 
1848         } else {
1849             pe_rsc_trace(parent, "Resource %s, skip inactive", child->id);
1850             if (!skip_inactive && !inactive_instance
1851                 && !pcmk_is_set(child->flags, pe_rsc_block)) {
1852                 // Remember one inactive instance in case we don't find active
1853                 inactive_instance = parent->fns->find_rsc(child, rsc_id, NULL,
1854                                                           pe_find_clone);
1855 
1856                 /* ... but don't use it if it was already associated with a
1857                  * pending action on another node
1858                  */
1859                 if (inactive_instance && inactive_instance->pending_node
1860                     && (inactive_instance->pending_node->details != node->details)) {
1861                     inactive_instance = NULL;
1862                 }
1863             }
1864         }
1865     }
1866 
1867     if ((rsc == NULL) && !skip_inactive && (inactive_instance != NULL)) {
1868         pe_rsc_trace(parent, "Resource %s, empty slot", inactive_instance->id);
1869         rsc = inactive_instance;
1870     }
1871 
1872     /* If the resource has "requires" set to "quorum" or "nothing", and we don't
1873      * have a clone instance for every node, we don't want to consume a valid
1874      * instance number for unclean nodes. Such instances may appear to be active
1875      * according to the history, but should be considered inactive, so we can
1876      * start an instance elsewhere. Treat such instances as orphans.
1877      *
1878      * An exception is instances running on guest nodes -- since guest node
1879      * "fencing" is actually just a resource stop, requires shouldn't apply.
1880      *
1881      * @TODO Ideally, we'd use an inactive instance number if it is not needed
1882      * for any clean instances. However, we don't know that at this point.
1883      */
1884     if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_needs_fencing)
1885         && (!node->details->online || node->details->unclean)
1886         && !pe__is_guest_node(node)
1887         && !pe__is_universal_clone(parent, data_set)) {
1888 
1889         rsc = NULL;
1890     }
1891 
1892     if (rsc == NULL) {
1893         rsc = create_anonymous_orphan(parent, rsc_id, node, data_set);
1894         pe_rsc_trace(parent, "Resource %s, orphan", rsc->id);
1895     }
1896     return rsc;
1897 }
1898 
1899 static pe_resource_t *
1900 unpack_find_resource(pe_working_set_t *data_set, const pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1901                      const char *rsc_id)
1902 {
1903     pe_resource_t *rsc = NULL;
1904     pe_resource_t *parent = NULL;
1905 
1906     crm_trace("looking for %s", rsc_id);
1907     rsc = pe_find_resource(data_set->resources, rsc_id);
1908 
1909     if (rsc == NULL) {
1910         /* If we didn't find the resource by its name in the operation history,
1911          * check it again as a clone instance. Even when clone-max=0, we create
1912          * a single :0 orphan to match against here.
1913          */
1914         char *clone0_id = clone_zero(rsc_id);
1915         pe_resource_t *clone0 = pe_find_resource(data_set->resources, clone0_id);
1916 
1917         if (clone0 && !pcmk_is_set(clone0->flags, pe_rsc_unique)) {
1918             rsc = clone0;
1919             parent = uber_parent(clone0);
1920             crm_trace("%s found as %s (%s)", rsc_id, clone0_id, parent->id);
1921         } else {
1922             crm_trace("%s is not known as %s either (orphan)",
1923                       rsc_id, clone0_id);
1924         }
1925         free(clone0_id);
1926 
1927     } else if (rsc->variant > pe_native) {
1928         crm_trace("Resource history for %s is orphaned because it is no longer primitive",
1929                   rsc_id);
1930         return NULL;
1931 
1932     } else {
1933         parent = uber_parent(rsc);
1934     }
1935 
1936     if (pe_rsc_is_anon_clone(parent)) {
1937 
1938         if (pe_rsc_is_bundled(parent)) {
1939             rsc = pe__find_bundle_replica(parent->parent, node);
1940         } else {
1941             char *base = clone_strip(rsc_id);
1942 
1943             rsc = find_anonymous_clone(data_set, node, parent, base);
1944             free(base);
1945             CRM_ASSERT(rsc != NULL);
1946         }
1947     }
1948 
1949     if (rsc && !pcmk__str_eq(rsc_id, rsc->id, pcmk__str_casei)
1950         && !pcmk__str_eq(rsc_id, rsc->clone_name, pcmk__str_casei)) {
1951 
1952         pcmk__str_update(&rsc->clone_name, rsc_id);
1953         pe_rsc_debug(rsc, "Internally renamed %s on %s to %s%s",
1954                      rsc_id, pe__node_name(node), rsc->id,
1955                      (pcmk_is_set(rsc->flags, pe_rsc_orphan)? " (ORPHAN)" : ""));
1956     }
1957     return rsc;
1958 }
1959 
1960 static pe_resource_t *
1961 process_orphan_resource(const xmlNode *rsc_entry, const pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1962                         pe_working_set_t *data_set)
1963 {
1964     pe_resource_t *rsc = NULL;
1965     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
1966 
1967     crm_debug("Detected orphan resource %s on %s", rsc_id, pe__node_name(node));
1968     rsc = create_fake_resource(rsc_id, rsc_entry, data_set);
1969     if (rsc == NULL) {
1970         return NULL;
1971     }
1972 
1973     if (!pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)) {
1974         pe__clear_resource_flags(rsc, pe_rsc_managed);
1975 
1976     } else {
1977         CRM_CHECK(rsc != NULL, return NULL);
1978         pe_rsc_trace(rsc, "Added orphan %s", rsc->id);
1979         resource_location(rsc, NULL, -INFINITY, "__orphan_do_not_run__", data_set);
1980     }
1981     return rsc;
1982 }
1983 
1984 static void
1985 process_rsc_state(pe_resource_t * rsc, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
1986                   enum action_fail_response on_fail)
1987 {
1988     pe_node_t *tmpnode = NULL;
1989     char *reason = NULL;
1990     enum action_fail_response save_on_fail = action_fail_ignore;
1991 
1992     CRM_ASSERT(rsc);
1993     pe_rsc_trace(rsc, "Resource %s is %s on %s: on_fail=%s",
1994                  rsc->id, role2text(rsc->role), pe__node_name(node),
1995                  fail2text(on_fail));
1996 
1997     /* process current state */
1998     if (rsc->role != RSC_ROLE_UNKNOWN) {
1999         pe_resource_t *iter = rsc;
2000 
2001         while (iter) {
2002             if (g_hash_table_lookup(iter->known_on, node->details->id) == NULL) {
2003                 pe_node_t *n = pe__copy_node(node);
2004 
2005                 pe_rsc_trace(rsc, "%s%s%s known on %s",
2006                              rsc->id,
2007                              ((rsc->clone_name == NULL)? "" : " also known as "),
2008                              ((rsc->clone_name == NULL)? "" : rsc->clone_name),
2009                              pe__node_name(n));
2010                 g_hash_table_insert(iter->known_on, (gpointer) n->details->id, n);
2011             }
2012             if (pcmk_is_set(iter->flags, pe_rsc_unique)) {
2013                 break;
2014             }
2015             iter = iter->parent;
2016         }
2017     }
2018 
2019     /* If a managed resource is believed to be running, but node is down ... */
2020     if (rsc->role > RSC_ROLE_STOPPED
2021         && node->details->online == FALSE
2022         && node->details->maintenance == FALSE
2023         && pcmk_is_set(rsc->flags, pe_rsc_managed)) {
2024 
2025         gboolean should_fence = FALSE;
2026 
2027         /* If this is a guest node, fence it (regardless of whether fencing is
2028          * enabled, because guest node fencing is done by recovery of the
2029          * container resource rather than by the fencer). Mark the resource
2030          * we're processing as failed. When the guest comes back up, its
2031          * operation history in the CIB will be cleared, freeing the affected
2032          * resource to run again once we are sure we know its state.
2033          */
2034         if (pe__is_guest_node(node)) {
2035             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2036             should_fence = TRUE;
2037 
2038         } else if (pcmk_is_set(rsc->cluster->flags, pe_flag_stonith_enabled)) {
2039             if (pe__is_remote_node(node) && node->details->remote_rsc
2040                 && !pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_failed)) {
2041 
2042                 /* Setting unseen means that fencing of the remote node will
2043                  * occur only if the connection resource is not going to start
2044                  * somewhere. This allows connection resources on a failed
2045                  * cluster node to move to another node without requiring the
2046                  * remote nodes to be fenced as well.
2047                  */
2048                 node->details->unseen = TRUE;
2049                 reason = crm_strdup_printf("%s is active there (fencing will be"
2050                                            " revoked if remote connection can "
2051                                            "be re-established elsewhere)",
2052                                            rsc->id);
2053             }
2054             should_fence = TRUE;
2055         }
2056 
2057         if (should_fence) {
2058             if (reason == NULL) {
2059                reason = crm_strdup_printf("%s is thought to be active there", rsc->id);
2060             }
2061             pe_fence_node(rsc->cluster, node, reason, FALSE);
2062         }
2063         free(reason);
2064     }
2065 
2066     /* In order to calculate priority_fencing_delay correctly, save the failure information and pass it to native_add_running(). */
2067     save_on_fail = on_fail;
2068 
2069     if (node->details->unclean) {
2070         /* No extra processing needed
2071          * Also allows resources to be started again after a node is shot
2072          */
2073         on_fail = action_fail_ignore;
2074     }
2075 
2076     switch (on_fail) {
2077         case action_fail_ignore:
2078             /* nothing to do */
2079             break;
2080 
2081         case action_fail_demote:
2082             pe__set_resource_flags(rsc, pe_rsc_failed);
2083             demote_action(rsc, node, FALSE);
2084             break;
2085 
2086         case action_fail_fence:
2087             /* treat it as if it is still running
2088              * but also mark the node as unclean
2089              */
2090             reason = crm_strdup_printf("%s failed there", rsc->id);
2091             pe_fence_node(rsc->cluster, node, reason, FALSE);
2092             free(reason);
2093             break;
2094 
2095         case action_fail_standby:
2096             node->details->standby = TRUE;
2097             node->details->standby_onfail = TRUE;
2098             break;
2099 
2100         case action_fail_block:
2101             /* is_managed == FALSE will prevent any
2102              * actions being sent for the resource
2103              */
2104             pe__clear_resource_flags(rsc, pe_rsc_managed);
2105             pe__set_resource_flags(rsc, pe_rsc_block);
2106             break;
2107 
2108         case action_fail_migrate:
2109             /* make sure it comes up somewhere else
2110              * or not at all
2111              */
2112             resource_location(rsc, node, -INFINITY, "__action_migration_auto__",
2113                               rsc->cluster);
2114             break;
2115 
2116         case action_fail_stop:
2117             pe__set_next_role(rsc, RSC_ROLE_STOPPED, "on-fail=stop");
2118             break;
2119 
2120         case action_fail_recover:
2121             if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2122                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2123                 stop_action(rsc, node, FALSE);
2124             }
2125             break;
2126 
2127         case action_fail_restart_container:
2128             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2129             if (rsc->container && pe_rsc_is_bundled(rsc)) {
2130                 /* A bundle's remote connection can run on a different node than
2131                  * the bundle's container. We don't necessarily know where the
2132                  * container is running yet, so remember it and add a stop
2133                  * action for it later.
2134                  */
2135                 rsc->cluster->stop_needed =
2136                     g_list_prepend(rsc->cluster->stop_needed, rsc->container);
2137             } else if (rsc->container) {
2138                 stop_action(rsc->container, node, FALSE);
2139             } else if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2140                 stop_action(rsc, node, FALSE);
2141             }
2142             break;
2143 
2144         case action_fail_reset_remote:
2145             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2146             if (pcmk_is_set(rsc->cluster->flags, pe_flag_stonith_enabled)) {
2147                 tmpnode = NULL;
2148                 if (rsc->is_remote_node) {
2149                     tmpnode = pe_find_node(rsc->cluster->nodes, rsc->id);
2150                 }
2151                 if (tmpnode &&
2152                     pe__is_remote_node(tmpnode) &&
2153                     tmpnode->details->remote_was_fenced == 0) {
2154 
2155                     /* The remote connection resource failed in a way that
2156                      * should result in fencing the remote node.
2157                      */
2158                     pe_fence_node(rsc->cluster, tmpnode,
2159                                   "remote connection is unrecoverable", FALSE);
2160                 }
2161             }
2162 
2163             /* require the stop action regardless if fencing is occurring or not. */
2164             if (rsc->role > RSC_ROLE_STOPPED) {
2165                 stop_action(rsc, node, FALSE);
2166             }
2167 
2168             /* if reconnect delay is in use, prevent the connection from exiting the
2169              * "STOPPED" role until the failure is cleared by the delay timeout. */
2170             if (rsc->remote_reconnect_ms) {
2171                 pe__set_next_role(rsc, RSC_ROLE_STOPPED, "remote reset");
2172             }
2173             break;
2174     }
2175 
2176     /* ensure a remote-node connection failure forces an unclean remote-node
2177      * to be fenced. By setting unseen = FALSE, the remote-node failure will
2178      * result in a fencing operation regardless if we're going to attempt to 
2179      * reconnect to the remote-node in this transition or not. */
2180     if (pcmk_is_set(rsc->flags, pe_rsc_failed) && rsc->is_remote_node) {
2181         tmpnode = pe_find_node(rsc->cluster->nodes, rsc->id);
2182         if (tmpnode && tmpnode->details->unclean) {
2183             tmpnode->details->unseen = FALSE;
2184         }
2185     }
2186 
2187     if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2188         if (pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
2189             if (pcmk_is_set(rsc->flags, pe_rsc_managed)) {
2190                 pcmk__config_warn("Detected active orphan %s running on %s",
2191                                   rsc->id, pe__node_name(node));
2192             } else {
2193                 pcmk__config_warn("Resource '%s' must be stopped manually on "
2194                                   "%s because cluster is configured not to "
2195                                   "stop active orphans",
2196                                   rsc->id, pe__node_name(node));
2197             }
2198         }
2199 
2200         native_add_running(rsc, node, rsc->cluster,
2201                            (save_on_fail != action_fail_ignore));
2202         switch (on_fail) {
2203             case action_fail_ignore:
2204                 break;
2205             case action_fail_demote:
2206             case action_fail_block:
2207                 pe__set_resource_flags(rsc, pe_rsc_failed);
2208                 break;
2209             default:
2210                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2211                 break;
2212         }
2213 
2214     } else if (rsc->clone_name && strchr(rsc->clone_name, ':') != NULL) {
2215         /* Only do this for older status sections that included instance numbers
2216          * Otherwise stopped instances will appear as orphans
2217          */
2218         pe_rsc_trace(rsc, "Resetting clone_name %s for %s (stopped)", rsc->clone_name, rsc->id);
2219         free(rsc->clone_name);
2220         rsc->clone_name = NULL;
2221 
2222     } else {
2223         GList *possible_matches = pe__resource_actions(rsc, node, RSC_STOP,
2224                                                        FALSE);
2225         GList *gIter = possible_matches;
2226 
2227         for (; gIter != NULL; gIter = gIter->next) {
2228             pe_action_t *stop = (pe_action_t *) gIter->data;
2229 
2230             pe__set_action_flags(stop, pe_action_optional);
2231         }
2232 
2233         g_list_free(possible_matches);
2234     }
2235 
2236     /* A successful stop after migrate_to on the migration source doesn't make
2237      * the partially migrated resource stopped on the migration target.
2238      */
2239     if (rsc->role == RSC_ROLE_STOPPED
2240         && rsc->partial_migration_source
2241         && rsc->partial_migration_source->details == node->details
2242         && rsc->partial_migration_target
2243         && rsc->running_on) {
2244 
2245         rsc->role = RSC_ROLE_STARTED;
2246     }
2247 }
2248 
2249 /* create active recurring operations as optional */
2250 static void
2251 process_recurring(pe_node_t * node, pe_resource_t * rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
2252                   int start_index, int stop_index,
2253                   GList *sorted_op_list, pe_working_set_t * data_set)
2254 {
2255     int counter = -1;
2256     const char *task = NULL;
2257     const char *status = NULL;
2258     GList *gIter = sorted_op_list;
2259 
2260     CRM_ASSERT(rsc);
2261     pe_rsc_trace(rsc, "%s: Start index %d, stop index = %d", rsc->id, start_index, stop_index);
2262 
2263     for (; gIter != NULL; gIter = gIter->next) {
2264         xmlNode *rsc_op = (xmlNode *) gIter->data;
2265 
2266         guint interval_ms = 0;
2267         char *key = NULL;
2268         const char *id = ID(rsc_op);
2269 
2270         counter++;
2271 
2272         if (node->details->online == FALSE) {
2273             pe_rsc_trace(rsc, "Skipping %s on %s: node is offline",
2274                          rsc->id, pe__node_name(node));
2275             break;
2276 
2277             /* Need to check if there's a monitor for role="Stopped" */
2278         } else if (start_index < stop_index && counter <= stop_index) {
2279             pe_rsc_trace(rsc, "Skipping %s on %s: resource is not active",
2280                          id, pe__node_name(node));
2281             continue;
2282 
2283         } else if (counter < start_index) {
2284             pe_rsc_trace(rsc, "Skipping %s on %s: old %d",
2285                          id, pe__node_name(node), counter);
2286             continue;
2287         }
2288 
2289         crm_element_value_ms(rsc_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
2290         if (interval_ms == 0) {
2291             pe_rsc_trace(rsc, "Skipping %s on %s: non-recurring",
2292                          id, pe__node_name(node));
2293             continue;
2294         }
2295 
2296         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2297         if (pcmk__str_eq(status, "-1", pcmk__str_casei)) {
2298             pe_rsc_trace(rsc, "Skipping %s on %s: status",
2299                          id, pe__node_name(node));
2300             continue;
2301         }
2302         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2303         /* create the action */
2304         key = pcmk__op_key(rsc->id, task, interval_ms);
2305         pe_rsc_trace(rsc, "Creating %s on %s", key, pe__node_name(node));
2306         custom_action(rsc, key, task, node, TRUE, TRUE, data_set);
2307     }
2308 }
2309 
2310 void
2311 calculate_active_ops(const GList *sorted_op_list, int *start_index,
     /* [previous][next][first][last][top][bottom][index][help] */
2312                      int *stop_index)
2313 {
2314     int counter = -1;
2315     int implied_monitor_start = -1;
2316     int implied_clone_start = -1;
2317     const char *task = NULL;
2318     const char *status = NULL;
2319 
2320     *stop_index = -1;
2321     *start_index = -1;
2322 
2323     for (const GList *iter = sorted_op_list; iter != NULL; iter = iter->next) {
2324         const xmlNode *rsc_op = (const xmlNode *) iter->data;
2325 
2326         counter++;
2327 
2328         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2329         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2330 
2331         if (pcmk__str_eq(task, CRMD_ACTION_STOP, pcmk__str_casei)
2332             && pcmk__str_eq(status, "0", pcmk__str_casei)) {
2333             *stop_index = counter;
2334 
2335         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_START, CRMD_ACTION_MIGRATED, NULL)) {
2336             *start_index = counter;
2337 
2338         } else if ((implied_monitor_start <= *stop_index) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
2339             const char *rc = crm_element_value(rsc_op, XML_LRM_ATTR_RC);
2340 
2341             if (pcmk__strcase_any_of(rc, "0", "8", NULL)) {
2342                 implied_monitor_start = counter;
2343             }
2344         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_PROMOTE, CRMD_ACTION_DEMOTE, NULL)) {
2345             implied_clone_start = counter;
2346         }
2347     }
2348 
2349     if (*start_index == -1) {
2350         if (implied_clone_start != -1) {
2351             *start_index = implied_clone_start;
2352         } else if (implied_monitor_start != -1) {
2353             *start_index = implied_monitor_start;
2354         }
2355     }
2356 }
2357 
2358 // If resource history entry has shutdown lock, remember lock node and time
2359 static void
2360 unpack_shutdown_lock(const xmlNode *rsc_entry, pe_resource_t *rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
2361                      const pe_node_t *node, pe_working_set_t *data_set)
2362 {
2363     time_t lock_time = 0;   // When lock started (i.e. node shutdown time)
2364 
2365     if ((crm_element_value_epoch(rsc_entry, XML_CONFIG_ATTR_SHUTDOWN_LOCK,
2366                                  &lock_time) == pcmk_ok) && (lock_time != 0)) {
2367 
2368         if ((data_set->shutdown_lock > 0)
2369             && (get_effective_time(data_set)
2370                 > (lock_time + data_set->shutdown_lock))) {
2371             pe_rsc_info(rsc, "Shutdown lock for %s on %s expired",
2372                         rsc->id, pe__node_name(node));
2373             pe__clear_resource_history(rsc, node, data_set);
2374         } else {
2375             /* @COMPAT I don't like breaking const signatures, but
2376              * rsc->lock_node should really be const -- we just can't change it
2377              * until the next API compatibility break.
2378              */
2379             rsc->lock_node = (pe_node_t *) node;
2380             rsc->lock_time = lock_time;
2381         }
2382     }
2383 }
2384 
2385 /*!
2386  * \internal
2387  * \brief Unpack one lrm_resource entry from a node's CIB status
2388  *
2389  * \param[in,out] node       Node whose status is being unpacked
2390  * \param[in]     rsc_entry  lrm_resource XML being unpacked
2391  * \param[in,out] data_set   Cluster working set
2392  *
2393  * \return Resource corresponding to the entry, or NULL if no operation history
2394  */
2395 static pe_resource_t *
2396 unpack_lrm_resource(pe_node_t *node, const xmlNode *lrm_resource,
     /* [previous][next][first][last][top][bottom][index][help] */
2397                     pe_working_set_t *data_set)
2398 {
2399     GList *gIter = NULL;
2400     int stop_index = -1;
2401     int start_index = -1;
2402     enum rsc_role_e req_role = RSC_ROLE_UNKNOWN;
2403 
2404     const char *rsc_id = ID(lrm_resource);
2405 
2406     pe_resource_t *rsc = NULL;
2407     GList *op_list = NULL;
2408     GList *sorted_op_list = NULL;
2409 
2410     xmlNode *rsc_op = NULL;
2411     xmlNode *last_failure = NULL;
2412 
2413     enum action_fail_response on_fail = action_fail_ignore;
2414     enum rsc_role_e saved_role = RSC_ROLE_UNKNOWN;
2415 
2416     if (rsc_id == NULL) {
2417         crm_warn("Ignoring malformed " XML_LRM_TAG_RESOURCE
2418                  " entry without id");
2419         return NULL;
2420     }
2421     crm_trace("Unpacking " XML_LRM_TAG_RESOURCE " for %s on %s",
2422               rsc_id, pe__node_name(node));
2423 
2424     // Build a list of individual lrm_rsc_op entries, so we can sort them
2425     for (rsc_op = first_named_child(lrm_resource, XML_LRM_TAG_RSC_OP);
2426          rsc_op != NULL; rsc_op = crm_next_same_xml(rsc_op)) {
2427 
2428         op_list = g_list_prepend(op_list, rsc_op);
2429     }
2430 
2431     if (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2432         if (op_list == NULL) {
2433             // If there are no operations, there is nothing to do
2434             return NULL;
2435         }
2436     }
2437 
2438     /* find the resource */
2439     rsc = unpack_find_resource(data_set, node, rsc_id);
2440     if (rsc == NULL) {
2441         if (op_list == NULL) {
2442             // If there are no operations, there is nothing to do
2443             return NULL;
2444         } else {
2445             rsc = process_orphan_resource(lrm_resource, node, data_set);
2446         }
2447     }
2448     CRM_ASSERT(rsc != NULL);
2449 
2450     // Check whether the resource is "shutdown-locked" to this node
2451     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2452         unpack_shutdown_lock(lrm_resource, rsc, node, data_set);
2453     }
2454 
2455     /* process operations */
2456     saved_role = rsc->role;
2457     rsc->role = RSC_ROLE_UNKNOWN;
2458     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
2459 
2460     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
2461         xmlNode *rsc_op = (xmlNode *) gIter->data;
2462 
2463         unpack_rsc_op(rsc, node, rsc_op, &last_failure, &on_fail);
2464     }
2465 
2466     /* create active recurring operations as optional */
2467     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
2468     process_recurring(node, rsc, start_index, stop_index, sorted_op_list, data_set);
2469 
2470     /* no need to free the contents */
2471     g_list_free(sorted_op_list);
2472 
2473     process_rsc_state(rsc, node, on_fail);
2474 
2475     if (get_target_role(rsc, &req_role)) {
2476         if (rsc->next_role == RSC_ROLE_UNKNOWN || req_role < rsc->next_role) {
2477             pe__set_next_role(rsc, req_role, XML_RSC_ATTR_TARGET_ROLE);
2478 
2479         } else if (req_role > rsc->next_role) {
2480             pe_rsc_info(rsc, "%s: Not overwriting calculated next role %s"
2481                         " with requested next role %s",
2482                         rsc->id, role2text(rsc->next_role), role2text(req_role));
2483         }
2484     }
2485 
2486     if (saved_role > rsc->role) {
2487         rsc->role = saved_role;
2488     }
2489 
2490     return rsc;
2491 }
2492 
2493 static void
2494 handle_orphaned_container_fillers(const xmlNode *lrm_rsc_list,
     /* [previous][next][first][last][top][bottom][index][help] */
2495                                   pe_working_set_t *data_set)
2496 {
2497     for (const xmlNode *rsc_entry = pcmk__xe_first_child(lrm_rsc_list);
2498          rsc_entry != NULL; rsc_entry = pcmk__xe_next(rsc_entry)) {
2499 
2500         pe_resource_t *rsc;
2501         pe_resource_t *container;
2502         const char *rsc_id;
2503         const char *container_id;
2504 
2505         if (!pcmk__str_eq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE, pcmk__str_casei)) {
2506             continue;
2507         }
2508 
2509         container_id = crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER);
2510         rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
2511         if (container_id == NULL || rsc_id == NULL) {
2512             continue;
2513         }
2514 
2515         container = pe_find_resource(data_set->resources, container_id);
2516         if (container == NULL) {
2517             continue;
2518         }
2519 
2520         rsc = pe_find_resource(data_set->resources, rsc_id);
2521         if (rsc == NULL ||
2522             !pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler) ||
2523             rsc->container != NULL) {
2524             continue;
2525         }
2526 
2527         pe_rsc_trace(rsc, "Mapped container of orphaned resource %s to %s",
2528                      rsc->id, container_id);
2529         rsc->container = container;
2530         container->fillers = g_list_append(container->fillers, rsc);
2531     }
2532 }
2533 
2534 /*!
2535  * \internal
2536  * \brief Unpack one node's lrm status section
2537  *
2538  * \param[in,out] node      Node whose status is being unpacked
2539  * \param[in]     xml       CIB node state XML
2540  * \param[in,out] data_set  Cluster working set
2541  */
2542 static void
2543 unpack_node_lrm(pe_node_t *node, const xmlNode *xml, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2544 {
2545     bool found_orphaned_container_filler = false;
2546 
2547     // Drill down to lrm_resources section
2548     xml = find_xml_node(xml, XML_CIB_TAG_LRM, FALSE);
2549     if (xml == NULL) {
2550         return;
2551     }
2552     xml = find_xml_node(xml, XML_LRM_TAG_RESOURCES, FALSE);
2553     if (xml == NULL) {
2554         return;
2555     }
2556 
2557     // Unpack each lrm_resource entry
2558     for (const xmlNode *rsc_entry = first_named_child(xml, XML_LRM_TAG_RESOURCE);
2559          rsc_entry != NULL; rsc_entry = crm_next_same_xml(rsc_entry)) {
2560 
2561         pe_resource_t *rsc = unpack_lrm_resource(node, rsc_entry, data_set);
2562 
2563         if ((rsc != NULL)
2564             && pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler)) {
2565             found_orphaned_container_filler = true;
2566         }
2567     }
2568 
2569     /* Now that all resource state has been unpacked for this node, map any
2570      * orphaned container fillers to their container resource.
2571      */
2572     if (found_orphaned_container_filler) {
2573         handle_orphaned_container_fillers(xml, data_set);
2574     }
2575 }
2576 
2577 static void
2578 set_active(pe_resource_t * rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
2579 {
2580     const pe_resource_t *top = pe__const_top_resource(rsc, false);
2581 
2582     if (top && pcmk_is_set(top->flags, pe_rsc_promotable)) {
2583         rsc->role = RSC_ROLE_UNPROMOTED;
2584     } else {
2585         rsc->role = RSC_ROLE_STARTED;
2586     }
2587 }
2588 
2589 static void
2590 set_node_score(gpointer key, gpointer value, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
2591 {
2592     pe_node_t *node = value;
2593     int *score = user_data;
2594 
2595     node->weight = *score;
2596 }
2597 
2598 #define XPATH_NODE_STATE "/" XML_TAG_CIB "/" XML_CIB_TAG_STATUS     \
2599                          "/" XML_CIB_TAG_STATE
2600 #define SUB_XPATH_LRM_RESOURCE "/" XML_CIB_TAG_LRM              \
2601                                "/" XML_LRM_TAG_RESOURCES        \
2602                                "/" XML_LRM_TAG_RESOURCE
2603 #define SUB_XPATH_LRM_RSC_OP "/" XML_LRM_TAG_RSC_OP
2604 
2605 static xmlNode *
2606 find_lrm_op(const char *resource, const char *op, const char *node, const char *source,
     /* [previous][next][first][last][top][bottom][index][help] */
2607             int target_rc, pe_working_set_t *data_set)
2608 {
2609     GString *xpath = NULL;
2610     xmlNode *xml = NULL;
2611 
2612     CRM_CHECK((resource != NULL) && (op != NULL) && (node != NULL),
2613               return NULL);
2614 
2615     xpath = g_string_sized_new(256);
2616     pcmk__g_strcat(xpath,
2617                    XPATH_NODE_STATE "[@" XML_ATTR_UNAME "='", node, "']"
2618                    SUB_XPATH_LRM_RESOURCE "[@" XML_ATTR_ID "='", resource, "']"
2619                    SUB_XPATH_LRM_RSC_OP "[@" XML_LRM_ATTR_TASK "='", op, "'",
2620                    NULL);
2621 
2622     /* Need to check against transition_magic too? */
2623     if ((source != NULL) && (strcmp(op, CRMD_ACTION_MIGRATE) == 0)) {
2624         pcmk__g_strcat(xpath,
2625                        " and @" XML_LRM_ATTR_MIGRATE_TARGET "='", source, "']",
2626                        NULL);
2627 
2628     } else if ((source != NULL) && (strcmp(op, CRMD_ACTION_MIGRATED) == 0)) {
2629         pcmk__g_strcat(xpath,
2630                        " and @" XML_LRM_ATTR_MIGRATE_SOURCE "='", source, "']",
2631                        NULL);
2632     } else {
2633         g_string_append_c(xpath, ']');
2634     }
2635 
2636     xml = get_xpath_object((const char *) xpath->str, data_set->input,
2637                            LOG_DEBUG);
2638     g_string_free(xpath, TRUE);
2639 
2640     if (xml && target_rc >= 0) {
2641         int rc = PCMK_OCF_UNKNOWN_ERROR;
2642         int status = PCMK_EXEC_ERROR;
2643 
2644         crm_element_value_int(xml, XML_LRM_ATTR_RC, &rc);
2645         crm_element_value_int(xml, XML_LRM_ATTR_OPSTATUS, &status);
2646         if ((rc != target_rc) || (status != PCMK_EXEC_DONE)) {
2647             return NULL;
2648         }
2649     }
2650     return xml;
2651 }
2652 
2653 static xmlNode *
2654 find_lrm_resource(const char *rsc_id, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2655                   pe_working_set_t *data_set)
2656 {
2657     GString *xpath = NULL;
2658     xmlNode *xml = NULL;
2659 
2660     CRM_CHECK((rsc_id != NULL) && (node_name != NULL), return NULL);
2661 
2662     xpath = g_string_sized_new(256);
2663     pcmk__g_strcat(xpath,
2664                    XPATH_NODE_STATE "[@" XML_ATTR_UNAME "='", node_name, "']"
2665                    SUB_XPATH_LRM_RESOURCE "[@" XML_ATTR_ID "='", rsc_id, "']",
2666                    NULL);
2667 
2668     xml = get_xpath_object((const char *) xpath->str, data_set->input,
2669                            LOG_DEBUG);
2670 
2671     g_string_free(xpath, TRUE);
2672     return xml;
2673 }
2674 
2675 /*!
2676  * \internal
2677  * \brief Check whether a resource has no completed action history on a node
2678  *
2679  * \param[in,out] rsc        Resource to check
2680  * \param[in]     node_name  Node to check
2681  *
2682  * \return true if \p rsc_id is unknown on \p node_name, otherwise false
2683  */
2684 static bool
2685 unknown_on_node(pe_resource_t *rsc, const char *node_name)
     /* [previous][next][first][last][top][bottom][index][help] */
2686 {
2687     bool result = false;
2688     xmlXPathObjectPtr search;
2689     GString *xpath = g_string_sized_new(256);
2690 
2691     pcmk__g_strcat(xpath,
2692                    XPATH_NODE_STATE "[@" XML_ATTR_UNAME "='", node_name, "']"
2693                    SUB_XPATH_LRM_RESOURCE "[@" XML_ATTR_ID "='", rsc->id, "']"
2694                    SUB_XPATH_LRM_RSC_OP "[@" XML_LRM_ATTR_RC "!='193']",
2695                    NULL);
2696     search = xpath_search(rsc->cluster->input, (const char *) xpath->str);
2697     result = (numXpathResults(search) == 0);
2698     freeXpathObject(search);
2699     g_string_free(xpath, TRUE);
2700     return result;
2701 }
2702 
2703 /*!
2704  * \brief Check whether a probe/monitor indicating the resource was not running
2705  * on a node happened after some event
2706  *
2707  * \param[in]     rsc_id     Resource being checked
2708  * \param[in]     node_name  Node being checked
2709  * \param[in]     xml_op     Event that monitor is being compared to
2710  * \param[in]     same_node  Whether the operations are on the same node
2711  * \param[in,out] data_set   Cluster working set
2712  *
2713  * \return true if such a monitor happened after event, false otherwise
2714  */
2715 static bool
2716 monitor_not_running_after(const char *rsc_id, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2717                           const xmlNode *xml_op, bool same_node,
2718                           pe_working_set_t *data_set)
2719 {
2720     /* Any probe/monitor operation on the node indicating it was not running
2721      * there
2722      */
2723     xmlNode *monitor = find_lrm_op(rsc_id, CRMD_ACTION_STATUS, node_name,
2724                                    NULL, PCMK_OCF_NOT_RUNNING, data_set);
2725 
2726     return (monitor && pe__is_newer_op(monitor, xml_op, same_node) > 0);
2727 }
2728 
2729 /*!
2730  * \brief Check whether any non-monitor operation on a node happened after some
2731  * event
2732  *
2733  * \param[in]     rsc_id    Resource being checked
2734  * \param[in]     node_name Node being checked
2735  * \param[in]     xml_op    Event that non-monitor is being compared to
2736  * \param[in]     same_node Whether the operations are on the same node
2737  * \param[in,out] data_set  Cluster working set
2738  *
2739  * \return true if such a operation happened after event, false otherwise
2740  */
2741 static bool
2742 non_monitor_after(const char *rsc_id, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2743                   const xmlNode *xml_op, bool same_node,
2744                   pe_working_set_t *data_set)
2745 {
2746     xmlNode *lrm_resource = NULL;
2747 
2748     lrm_resource = find_lrm_resource(rsc_id, node_name, data_set);
2749     if (lrm_resource == NULL) {
2750         return false;
2751     }
2752 
2753     for (xmlNode *op = first_named_child(lrm_resource, XML_LRM_TAG_RSC_OP);
2754          op != NULL; op = crm_next_same_xml(op)) {
2755         const char * task = NULL;
2756 
2757         if (op == xml_op) {
2758             continue;
2759         }
2760 
2761         task = crm_element_value(op, XML_LRM_ATTR_TASK);
2762 
2763         if (pcmk__str_any_of(task, CRMD_ACTION_START, CRMD_ACTION_STOP,
2764                              CRMD_ACTION_MIGRATE, CRMD_ACTION_MIGRATED, NULL)
2765             && pe__is_newer_op(op, xml_op, same_node) > 0) {
2766             return true;
2767         }
2768     }
2769 
2770     return false;
2771 }
2772 
2773 /*!
2774  * \brief Check whether the resource has newer state on a node after a migration
2775  * attempt
2776  *
2777  * \param[in]     rsc_id       Resource being checked
2778  * \param[in]     node_name    Node being checked
2779  * \param[in]     migrate_to   Any migrate_to event that is being compared to
2780  * \param[in]     migrate_from Any migrate_from event that is being compared to
2781  * \param[in,out] data_set     Cluster working set
2782  *
2783  * \return true if such a operation happened after event, false otherwise
2784  */
2785 static bool
2786 newer_state_after_migrate(const char *rsc_id, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2787                           const xmlNode *migrate_to,
2788                           const xmlNode *migrate_from,
2789                           pe_working_set_t *data_set)
2790 {
2791     const xmlNode *xml_op = migrate_to;
2792     const char *source = NULL;
2793     const char *target = NULL;
2794     bool same_node = false;
2795 
2796     if (migrate_from) {
2797         xml_op = migrate_from;
2798     }
2799 
2800     source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2801     target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2802 
2803     /* It's preferred to compare to the migrate event on the same node if
2804      * existing, since call ids are more reliable.
2805      */
2806     if (pcmk__str_eq(node_name, target, pcmk__str_casei)) {
2807         if (migrate_from) {
2808            xml_op = migrate_from;
2809            same_node = true;
2810 
2811         } else {
2812            xml_op = migrate_to;
2813         }
2814 
2815     } else if (pcmk__str_eq(node_name, source, pcmk__str_casei)) {
2816         if (migrate_to) {
2817            xml_op = migrate_to;
2818            same_node = true;
2819 
2820         } else {
2821            xml_op = migrate_from;
2822         }
2823     }
2824 
2825     /* If there's any newer non-monitor operation on the node, or any newer
2826      * probe/monitor operation on the node indicating it was not running there,
2827      * the migration events potentially no longer matter for the node.
2828      */
2829     return non_monitor_after(rsc_id, node_name, xml_op, same_node, data_set)
2830            || monitor_not_running_after(rsc_id, node_name, xml_op, same_node,
2831                                         data_set);
2832 }
2833 
2834 /*!
2835  * \internal
2836  * \brief Parse migration source and target node names from history entry
2837  *
2838  * \param[in]  entry        Resource history entry for a migration action
2839  * \param[in]  source_node  If not NULL, source must match this node
2840  * \param[in]  target_node  If not NULL, target must match this node
2841  * \param[out] source_name  Where to store migration source node name
2842  * \param[out] target_name  Where to store migration target node name
2843  *
2844  * \return Standard Pacemaker return code
2845  */
2846 static int
2847 get_migration_node_names(const xmlNode *entry, const pe_node_t *source_node,
     /* [previous][next][first][last][top][bottom][index][help] */
2848                          const pe_node_t *target_node,
2849                          const char **source_name, const char **target_name)
2850 {
2851     *source_name = crm_element_value(entry, XML_LRM_ATTR_MIGRATE_SOURCE);
2852     *target_name = crm_element_value(entry, XML_LRM_ATTR_MIGRATE_TARGET);
2853     if ((*source_name == NULL) || (*target_name == NULL)) {
2854         crm_err("Ignoring resource history entry %s without "
2855                 XML_LRM_ATTR_MIGRATE_SOURCE " and " XML_LRM_ATTR_MIGRATE_TARGET,
2856                 ID(entry));
2857         return pcmk_rc_unpack_error;
2858     }
2859 
2860     if ((source_node != NULL)
2861         && !pcmk__str_eq(*source_name, source_node->details->uname,
2862                          pcmk__str_casei|pcmk__str_null_matches)) {
2863         crm_err("Ignoring resource history entry %s because "
2864                 XML_LRM_ATTR_MIGRATE_SOURCE "='%s' does not match %s",
2865                 ID(entry), *source_name, pe__node_name(source_node));
2866         return pcmk_rc_unpack_error;
2867     }
2868 
2869     if ((target_node != NULL)
2870         && !pcmk__str_eq(*target_name, target_node->details->uname,
2871                          pcmk__str_casei|pcmk__str_null_matches)) {
2872         crm_err("Ignoring resource history entry %s because "
2873                 XML_LRM_ATTR_MIGRATE_TARGET "='%s' does not match %s",
2874                 ID(entry), *target_name, pe__node_name(target_node));
2875         return pcmk_rc_unpack_error;
2876     }
2877 
2878     return pcmk_rc_ok;
2879 }
2880 
2881 /*
2882  * \internal
2883  * \brief Add a migration source to a resource's list of dangling migrations
2884  *
2885  * If the migrate_to and migrate_from actions in a live migration both
2886  * succeeded, but there is no stop on the source, the migration is considered
2887  * "dangling." Add the source to the resource's dangling migration list, which
2888  * will be used to schedule a stop on the source without affecting the target.
2889  *
2890  * \param[in,out] rsc   Resource involved in migration
2891  * \param[in]     node  Migration source
2892  */
2893 static void
2894 add_dangling_migration(pe_resource_t *rsc, const pe_node_t *node)
     /* [previous][next][first][last][top][bottom][index][help] */
2895 {
2896     pe_rsc_trace(rsc, "Dangling migration of %s requires stop on %s",
2897                  rsc->id, pe__node_name(node));
2898     rsc->role = RSC_ROLE_STOPPED;
2899     rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations,
2900                                               (gpointer) node);
2901 }
2902 
2903 /*!
2904  * \internal
2905  * \brief Update resource role etc. after a successful migrate_to action
2906  *
2907  * \param[in,out] history  Parsed action result history
2908  */
2909 static void
2910 unpack_migrate_to_success(struct action_history *history)
     /* [previous][next][first][last][top][bottom][index][help] */
2911 {
2912     /* A complete migration sequence is:
2913      * 1. migrate_to on source node (which succeeded if we get to this function)
2914      * 2. migrate_from on target node
2915      * 3. stop on source node
2916      *
2917      * If no migrate_from has happened, the migration is considered to be
2918      * "partial". If the migrate_from succeeded but no stop has happened, the
2919      * migration is considered to be "dangling".
2920      *
2921      * If a successful migrate_to and stop have happened on the source node, we
2922      * still need to check for a partial migration, due to scenarios (easier to
2923      * produce with batch-limit=1) like:
2924      *
2925      * - A resource is migrating from node1 to node2, and a migrate_to is
2926      *   initiated for it on node1.
2927      *
2928      * - node2 goes into standby mode while the migrate_to is pending, which
2929      *   aborts the transition.
2930      *
2931      * - Upon completion of the migrate_to, a new transition schedules a stop
2932      *   on both nodes and a start on node1.
2933      *
2934      * - If the new transition is aborted for any reason while the resource is
2935      *   stopping on node1, the transition after that stop completes will see
2936      *   the migrate_to and stop on the source, but it's still a partial
2937      *   migration, and the resource must be stopped on node2 because it is
2938      *   potentially active there due to the migrate_to.
2939      *
2940      *   We also need to take into account that either node's history may be
2941      *   cleared at any point in the migration process.
2942      */
2943     int from_rc = PCMK_OCF_OK;
2944     int from_status = PCMK_EXEC_PENDING;
2945     pe_node_t *target_node = NULL;
2946     xmlNode *migrate_from = NULL;
2947     const char *source = NULL;
2948     const char *target = NULL;
2949     bool source_newer_op = false;
2950     bool target_newer_state = false;
2951     bool active_on_target = false;
2952 
2953     // Get source and target node names from XML
2954     if (get_migration_node_names(history->xml, history->node, NULL, &source,
2955                                  &target) != pcmk_rc_ok) {
2956         return;
2957     }
2958 
2959     // Check for newer state on the source
2960     source_newer_op = non_monitor_after(history->rsc->id, source, history->xml,
2961                                         true, history->rsc->cluster);
2962 
2963     // Check for a migrate_from action from this source on the target
2964     migrate_from = find_lrm_op(history->rsc->id, CRMD_ACTION_MIGRATED, target,
2965                                source, -1, history->rsc->cluster);
2966     if (migrate_from != NULL) {
2967         if (source_newer_op) {
2968             /* There's a newer non-monitor operation on the source and a
2969              * migrate_from on the target, so this migrate_to is irrelevant to
2970              * the resource's state.
2971              */
2972             return;
2973         }
2974         crm_element_value_int(migrate_from, XML_LRM_ATTR_RC, &from_rc);
2975         crm_element_value_int(migrate_from, XML_LRM_ATTR_OPSTATUS,
2976                               &from_status);
2977     }
2978 
2979     /* If the resource has newer state on both the source and target after the
2980      * migration events, this migrate_to is irrelevant to the resource's state.
2981      */
2982     target_newer_state = newer_state_after_migrate(history->rsc->id, target,
2983                                                    history->xml, migrate_from,
2984                                                    history->rsc->cluster);
2985     if (source_newer_op && target_newer_state) {
2986         return;
2987     }
2988 
2989     /* Check for dangling migration (migrate_from succeeded but stop not done).
2990      * We know there's no stop because we already returned if the target has a
2991      * migrate_from and the source has any newer non-monitor operation.
2992      */
2993     if ((from_rc == PCMK_OCF_OK) && (from_status == PCMK_EXEC_DONE)) {
2994         add_dangling_migration(history->rsc, history->node);
2995         return;
2996     }
2997 
2998     /* Without newer state, this migrate_to implies the resource is active.
2999      * (Clones are not allowed to migrate, so role can't be promoted.)
3000      */
3001     history->rsc->role = RSC_ROLE_STARTED;
3002 
3003     target_node = pe_find_node(history->rsc->cluster->nodes, target);
3004     active_on_target = !target_newer_state && (target_node != NULL)
3005                        && target_node->details->online;
3006 
3007     if (from_status != PCMK_EXEC_PENDING) { // migrate_from failed on target
3008         if (active_on_target) {
3009             native_add_running(history->rsc, target_node, history->rsc->cluster,
3010                                TRUE);
3011         } else {
3012             // Mark resource as failed, require recovery, and prevent migration
3013             pe__set_resource_flags(history->rsc, pe_rsc_failed|pe_rsc_stop);
3014             pe__clear_resource_flags(history->rsc, pe_rsc_allow_migrate);
3015         }
3016         return;
3017     }
3018 
3019     // The migrate_from is pending, complete but erased, or to be scheduled
3020 
3021     /* If there is no history at all for the resource on an online target, then
3022      * it was likely cleaned. Just return, and we'll schedule a probe. Once we
3023      * have the probe result, it will be reflected in target_newer_state.
3024      */
3025     if ((target_node != NULL) && target_node->details->online
3026         && unknown_on_node(history->rsc, target)) {
3027         return;
3028     }
3029 
3030     if (active_on_target) {
3031         pe_node_t *source_node = pe_find_node(history->rsc->cluster->nodes,
3032                                               source);
3033 
3034         native_add_running(history->rsc, target_node, history->rsc->cluster,
3035                            FALSE);
3036         if ((source_node != NULL) && source_node->details->online) {
3037             /* This is a partial migration: the migrate_to completed
3038              * successfully on the source, but the migrate_from has not
3039              * completed. Remember the source and target; if the newly
3040              * chosen target remains the same when we schedule actions
3041              * later, we may continue with the migration.
3042              */
3043             history->rsc->partial_migration_target = target_node;
3044             history->rsc->partial_migration_source = source_node;
3045         }
3046 
3047     } else if (!source_newer_op) {
3048         // Mark resource as failed, require recovery, and prevent migration
3049         pe__set_resource_flags(history->rsc, pe_rsc_failed|pe_rsc_stop);
3050         pe__clear_resource_flags(history->rsc, pe_rsc_allow_migrate);
3051     }
3052 }
3053 
3054 /*!
3055  * \internal
3056  * \brief Update resource role etc. after a failed migrate_to action
3057  *
3058  * \param[in,out] history  Parsed action result history
3059  */
3060 static void
3061 unpack_migrate_to_failure(struct action_history *history)
     /* [previous][next][first][last][top][bottom][index][help] */
3062 {
3063     xmlNode *target_migrate_from = NULL;
3064     const char *source = NULL;
3065     const char *target = NULL;
3066 
3067     // Get source and target node names from XML
3068     if (get_migration_node_names(history->xml, history->node, NULL, &source,
3069                                  &target) != pcmk_rc_ok) {
3070         return;
3071     }
3072 
3073     /* If a migration failed, we have to assume the resource is active. Clones
3074      * are not allowed to migrate, so role can't be promoted.
3075      */
3076     history->rsc->role = RSC_ROLE_STARTED;
3077 
3078     // Check for migrate_from on the target
3079     target_migrate_from = find_lrm_op(history->rsc->id, CRMD_ACTION_MIGRATED,
3080                                       target, source, PCMK_OCF_OK,
3081                                       history->rsc->cluster);
3082 
3083     if (/* If the resource state is unknown on the target, it will likely be
3084          * probed there.
3085          * Don't just consider it running there. We will get back here anyway in
3086          * case the probe detects it's running there.
3087          */
3088         !unknown_on_node(history->rsc, target)
3089         /* If the resource has newer state on the target after the migration
3090          * events, this migrate_to no longer matters for the target.
3091          */
3092         && !newer_state_after_migrate(history->rsc->id, target, history->xml,
3093                                       target_migrate_from,
3094                                       history->rsc->cluster)) {
3095         /* The resource has no newer state on the target, so assume it's still
3096          * active there.
3097          * (if it is up).
3098          */
3099         pe_node_t *target_node = pe_find_node(history->rsc->cluster->nodes,
3100                                               target);
3101 
3102         if (target_node && target_node->details->online) {
3103             native_add_running(history->rsc, target_node, history->rsc->cluster,
3104                                FALSE);
3105         }
3106 
3107     } else if (!non_monitor_after(history->rsc->id, source, history->xml, true,
3108                                   history->rsc->cluster)) {
3109         /* We know the resource has newer state on the target, but this
3110          * migrate_to still matters for the source as long as there's no newer
3111          * non-monitor operation there.
3112          */
3113 
3114         // Mark node as having dangling migration so we can force a stop later
3115         history->rsc->dangling_migrations =
3116             g_list_prepend(history->rsc->dangling_migrations,
3117                            (gpointer) history->node);
3118     }
3119 }
3120 
3121 /*!
3122  * \internal
3123  * \brief Update resource role etc. after a failed migrate_from action
3124  *
3125  * \param[in,out] history  Parsed action result history
3126  */
3127 static void
3128 unpack_migrate_from_failure(struct action_history *history)
     /* [previous][next][first][last][top][bottom][index][help] */
3129 {
3130     xmlNode *source_migrate_to = NULL;
3131     const char *source = NULL;
3132     const char *target = NULL;
3133 
3134     // Get source and target node names from XML
3135     if (get_migration_node_names(history->xml, NULL, history->node, &source,
3136                                  &target) != pcmk_rc_ok) {
3137         return;
3138     }
3139 
3140     /* If a migration failed, we have to assume the resource is active. Clones
3141      * are not allowed to migrate, so role can't be promoted.
3142      */
3143     history->rsc->role = RSC_ROLE_STARTED;
3144 
3145     // Check for a migrate_to on the source
3146     source_migrate_to = find_lrm_op(history->rsc->id, CRMD_ACTION_MIGRATE,
3147                                     source, target, PCMK_OCF_OK,
3148                                     history->rsc->cluster);
3149 
3150     if (/* If the resource state is unknown on the source, it will likely be
3151          * probed there.
3152          * Don't just consider it running there. We will get back here anyway in
3153          * case the probe detects it's running there.
3154          */
3155         !unknown_on_node(history->rsc, source)
3156         /* If the resource has newer state on the source after the migration
3157          * events, this migrate_from no longer matters for the source.
3158          */
3159         && !newer_state_after_migrate(history->rsc->id, source,
3160                                       source_migrate_to, history->xml,
3161                                       history->rsc->cluster)) {
3162         /* The resource has no newer state on the source, so assume it's still
3163          * active there (if it is up).
3164          */
3165         pe_node_t *source_node = pe_find_node(history->rsc->cluster->nodes,
3166                                               source);
3167 
3168         if (source_node && source_node->details->online) {
3169             native_add_running(history->rsc, source_node, history->rsc->cluster,
3170                                TRUE);
3171         }
3172     }
3173 }
3174 
3175 /*!
3176  * \internal
3177  * \brief Add an action to cluster's list of failed actions
3178  *
3179  * \param[in,out] history  Parsed action result history
3180  */
3181 static void
3182 record_failed_op(struct action_history *history)
     /* [previous][next][first][last][top][bottom][index][help] */
3183 {
3184     if (!(history->node->details->online)) {
3185         return;
3186     }
3187 
3188     for (const xmlNode *xIter = history->rsc->cluster->failed->children;
3189          xIter != NULL; xIter = xIter->next) {
3190 
3191         const char *key = pe__xe_history_key(xIter);
3192         const char *uname = crm_element_value(xIter, XML_ATTR_UNAME);
3193 
3194         if (pcmk__str_eq(history->key, key, pcmk__str_none)
3195             && pcmk__str_eq(uname, history->node->details->uname,
3196                             pcmk__str_casei)) {
3197             crm_trace("Skipping duplicate entry %s on %s",
3198                       history->key, pe__node_name(history->node));
3199             return;
3200         }
3201     }
3202 
3203     crm_trace("Adding entry for %s on %s to failed action list",
3204               history->key, pe__node_name(history->node));
3205     crm_xml_add(history->xml, XML_ATTR_UNAME, history->node->details->uname);
3206     crm_xml_add(history->xml, XML_LRM_ATTR_RSCID, history->rsc->id);
3207     add_node_copy(history->rsc->cluster->failed, history->xml);
3208 }
3209 
3210 static char *
3211 last_change_str(const xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
3212 {
3213     time_t when;
3214     char *result = NULL;
3215 
3216     if (crm_element_value_epoch(xml_op, XML_RSC_OP_LAST_CHANGE,
3217                                 &when) == pcmk_ok) {
3218         char *when_s = pcmk__epoch2str(&when, 0);
3219         const char *p = strchr(when_s, ' ');
3220 
3221         // Skip day of week to make message shorter
3222         if ((p != NULL) && (*(++p) != '\0')) {
3223             result = strdup(p);
3224             CRM_ASSERT(result != NULL);
3225         }
3226         free(when_s);
3227     }
3228 
3229     if (result == NULL) {
3230         result = strdup("unknown time");
3231         CRM_ASSERT(result != NULL);
3232     }
3233 
3234     return result;
3235 }
3236 
3237 /*!
3238  * \internal
3239  * \brief Compare two on-fail values
3240  *
3241  * \param[in] first   One on-fail value to compare
3242  * \param[in] second  The other on-fail value to compare
3243  *
3244  * \return A negative number if second is more severe than first, zero if they
3245  *         are equal, or a positive number if first is more severe than second.
3246  * \note This is only needed until the action_fail_response values can be
3247  *       renumbered at the next API compatibility break.
3248  */
3249 static int
3250 cmp_on_fail(enum action_fail_response first, enum action_fail_response second)
     /* [previous][next][first][last][top][bottom][index][help] */
3251 {
3252     switch (first) {
3253         case action_fail_demote:
3254             switch (second) {
3255                 case action_fail_ignore:
3256                     return 1;
3257                 case action_fail_demote:
3258                     return 0;
3259                 default:
3260                     return -1;
3261             }
3262             break;
3263 
3264         case action_fail_reset_remote:
3265             switch (second) {
3266                 case action_fail_ignore:
3267                 case action_fail_demote:
3268                 case action_fail_recover:
3269                     return 1;
3270                 case action_fail_reset_remote:
3271                     return 0;
3272                 default:
3273                     return -1;
3274             }
3275             break;
3276 
3277         case action_fail_restart_container:
3278             switch (second) {
3279                 case action_fail_ignore:
3280                 case action_fail_demote:
3281                 case action_fail_recover:
3282                 case action_fail_reset_remote:
3283                     return 1;
3284                 case action_fail_restart_container:
3285                     return 0;
3286                 default:
3287                     return -1;
3288             }
3289             break;
3290 
3291         default:
3292             break;
3293     }
3294     switch (second) {
3295         case action_fail_demote:
3296             return (first == action_fail_ignore)? -1 : 1;
3297 
3298         case action_fail_reset_remote:
3299             switch (first) {
3300                 case action_fail_ignore:
3301                 case action_fail_demote:
3302                 case action_fail_recover:
3303                     return -1;
3304                 default:
3305                     return 1;
3306             }
3307             break;
3308 
3309         case action_fail_restart_container:
3310             switch (first) {
3311                 case action_fail_ignore:
3312                 case action_fail_demote:
3313                 case action_fail_recover:
3314                 case action_fail_reset_remote:
3315                     return -1;
3316                 default:
3317                     return 1;
3318             }
3319             break;
3320 
3321         default:
3322             break;
3323     }
3324     return first - second;
3325 }
3326 
3327 /*!
3328  * \internal
3329  * \brief Ban a resource (or its clone if an anonymous instance) from all nodes
3330  *
3331  * \param[in,out] rsc  Resource to ban
3332  */
3333 static void
3334 ban_from_all_nodes(pe_resource_t *rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
3335 {
3336     int score = -INFINITY;
3337     pe_resource_t *fail_rsc = rsc;
3338 
3339     if (fail_rsc->parent != NULL) {
3340         pe_resource_t *parent = uber_parent(fail_rsc);
3341 
3342         if (pe_rsc_is_anon_clone(parent)) {
3343             /* For anonymous clones, if an operation with on-fail=stop fails for
3344              * any instance, the entire clone must stop.
3345              */
3346             fail_rsc = parent;
3347         }
3348     }
3349 
3350     // Ban the resource from all nodes
3351     crm_notice("%s will not be started under current conditions", fail_rsc->id);
3352     if (fail_rsc->allowed_nodes != NULL) {
3353         g_hash_table_destroy(fail_rsc->allowed_nodes);
3354     }
3355     fail_rsc->allowed_nodes = pe__node_list2table(rsc->cluster->nodes);
3356     g_hash_table_foreach(fail_rsc->allowed_nodes, set_node_score, &score);
3357 }
3358 
3359 /*!
3360  * \internal
3361  * \brief Update resource role, failure handling, etc., after a failed action
3362  *
3363  * \param[in,out] history       Parsed action result history
3364  * \param[out]    last_failure  Set this to action XML
3365  * \param[in,out] on_fail       What should be done about the result
3366  */
3367 static void
3368 unpack_rsc_op_failure(struct action_history *history, xmlNode **last_failure,
     /* [previous][next][first][last][top][bottom][index][help] */
3369                       enum action_fail_response *on_fail)
3370 {
3371     bool is_probe = false;
3372     pe_action_t *action = NULL;
3373     char *last_change_s = NULL;
3374 
3375     *last_failure = history->xml;
3376 
3377     is_probe = pcmk_xe_is_probe(history->xml);
3378     last_change_s = last_change_str(history->xml);
3379 
3380     if (!pcmk_is_set(history->rsc->cluster->flags, pe_flag_symmetric_cluster)
3381         && (history->exit_status == PCMK_OCF_NOT_INSTALLED)) {
3382         crm_trace("Unexpected result (%s%s%s) was recorded for "
3383                   "%s of %s on %s at %s " CRM_XS " exit-status=%d id=%s",
3384                   services_ocf_exitcode_str(history->exit_status),
3385                   (pcmk__str_empty(history->exit_reason)? "" : ": "),
3386                   pcmk__s(history->exit_reason, ""),
3387                   (is_probe? "probe" : history->task), history->rsc->id,
3388                   pe__node_name(history->node), last_change_s,
3389                   history->exit_status, history->id);
3390     } else {
3391         crm_warn("Unexpected result (%s%s%s) was recorded for "
3392                   "%s of %s on %s at %s " CRM_XS " exit-status=%d id=%s",
3393                  services_ocf_exitcode_str(history->exit_status),
3394                  (pcmk__str_empty(history->exit_reason)? "" : ": "),
3395                  pcmk__s(history->exit_reason, ""),
3396                  (is_probe? "probe" : history->task), history->rsc->id,
3397                  pe__node_name(history->node), last_change_s,
3398                  history->exit_status, history->id);
3399 
3400         if (is_probe && (history->exit_status != PCMK_OCF_OK)
3401             && (history->exit_status != PCMK_OCF_NOT_RUNNING)
3402             && (history->exit_status != PCMK_OCF_RUNNING_PROMOTED)) {
3403 
3404             /* A failed (not just unexpected) probe result could mean the user
3405              * didn't know resources will be probed even where they can't run.
3406              */
3407             crm_notice("If it is not possible for %s to run on %s, see "
3408                        "the resource-discovery option for location constraints",
3409                        history->rsc->id, pe__node_name(history->node));
3410         }
3411 
3412         record_failed_op(history);
3413     }
3414 
3415     free(last_change_s);
3416 
3417     action = custom_action(history->rsc, strdup(history->key), history->task,
3418                            NULL, TRUE, FALSE, history->rsc->cluster);
3419     if (cmp_on_fail(*on_fail, action->on_fail) < 0) {
3420         pe_rsc_trace(history->rsc, "on-fail %s -> %s for %s (%s)",
3421                      fail2text(*on_fail), fail2text(action->on_fail),
3422                      action->uuid, history->key);
3423         *on_fail = action->on_fail;
3424     }
3425 
3426     if (strcmp(history->task, CRMD_ACTION_STOP) == 0) {
3427         resource_location(history->rsc, history->node, -INFINITY,
3428                           "__stop_fail__", history->rsc->cluster);
3429 
3430     } else if (strcmp(history->task, CRMD_ACTION_MIGRATE) == 0) {
3431         unpack_migrate_to_failure(history);
3432 
3433     } else if (strcmp(history->task, CRMD_ACTION_MIGRATED) == 0) {
3434         unpack_migrate_from_failure(history);
3435 
3436     } else if (strcmp(history->task, CRMD_ACTION_PROMOTE) == 0) {
3437         history->rsc->role = RSC_ROLE_PROMOTED;
3438 
3439     } else if (strcmp(history->task, CRMD_ACTION_DEMOTE) == 0) {
3440         if (action->on_fail == action_fail_block) {
3441             history->rsc->role = RSC_ROLE_PROMOTED;
3442             pe__set_next_role(history->rsc, RSC_ROLE_STOPPED,
3443                               "demote with on-fail=block");
3444 
3445         } else if (history->exit_status == PCMK_OCF_NOT_RUNNING) {
3446             history->rsc->role = RSC_ROLE_STOPPED;
3447 
3448         } else {
3449             /* Staying in the promoted role would put the scheduler and
3450              * controller into a loop. Setting the role to unpromoted is not
3451              * dangerous because the resource will be stopped as part of
3452              * recovery, and any promotion will be ordered after that stop.
3453              */
3454             history->rsc->role = RSC_ROLE_UNPROMOTED;
3455         }
3456     }
3457 
3458     if (is_probe && (history->exit_status == PCMK_OCF_NOT_INSTALLED)) {
3459         /* leave stopped */
3460         pe_rsc_trace(history->rsc, "Leaving %s stopped", history->rsc->id);
3461         history->rsc->role = RSC_ROLE_STOPPED;
3462 
3463     } else if (history->rsc->role < RSC_ROLE_STARTED) {
3464         pe_rsc_trace(history->rsc, "Setting %s active", history->rsc->id);
3465         set_active(history->rsc);
3466     }
3467 
3468     pe_rsc_trace(history->rsc,
3469                  "Resource %s: role=%s, unclean=%s, on_fail=%s, fail_role=%s",
3470                  history->rsc->id, role2text(history->rsc->role),
3471                  pcmk__btoa(history->node->details->unclean),
3472                  fail2text(action->on_fail), role2text(action->fail_role));
3473 
3474     if ((action->fail_role != RSC_ROLE_STARTED)
3475         && (history->rsc->next_role < action->fail_role)) {
3476         pe__set_next_role(history->rsc, action->fail_role, "failure");
3477     }
3478 
3479     if (action->fail_role == RSC_ROLE_STOPPED) {
3480         ban_from_all_nodes(history->rsc);
3481     }
3482 
3483     pe_free_action(action);
3484 }
3485 
3486 /*!
3487  * \internal
3488  * \brief Block a resource with a failed action if it cannot be recovered
3489  *
3490  * If resource action is a failed stop and fencing is not possible, mark the
3491  * resource as unmanaged and blocked, since recovery cannot be done.
3492  *
3493  * \param[in,out] history  Parsed action history entry
3494  */
3495 static void
3496 block_if_unrecoverable(struct action_history *history)
     /* [previous][next][first][last][top][bottom][index][help] */
3497 {
3498     char *last_change_s = NULL;
3499 
3500     if (strcmp(history->task, CRMD_ACTION_STOP) != 0) {
3501         return; // All actions besides stop are always recoverable
3502     }
3503     if (pe_can_fence(history->node->details->data_set, history->node)) {
3504         return; // Failed stops are recoverable via fencing
3505     }
3506 
3507     last_change_s = last_change_str(history->xml);
3508     pe_proc_err("No further recovery can be attempted for %s "
3509                 "because %s on %s failed (%s%s%s) at %s "
3510                 CRM_XS " rc=%d id=%s",
3511                 history->rsc->id, history->task, pe__node_name(history->node),
3512                 services_ocf_exitcode_str(history->exit_status),
3513                 (pcmk__str_empty(history->exit_reason)? "" : ": "),
3514                 pcmk__s(history->exit_reason, ""),
3515                 last_change_s, history->exit_status, history->id);
3516 
3517     free(last_change_s);
3518 
3519     pe__clear_resource_flags(history->rsc, pe_rsc_managed);
3520     pe__set_resource_flags(history->rsc, pe_rsc_block);
3521 }
3522 
3523 /*!
3524  * \internal
3525  * \brief Update action history's execution status and why
3526  *
3527  * \param[in,out] history  Parsed action history entry
3528  * \param[out]    why      Where to store reason for update
3529  * \param[in]     value    New value
3530  * \param[in]     reason   Description of why value was changed
3531  */
3532 static inline void
3533 remap_because(struct action_history *history, const char **why, int value,
     /* [previous][next][first][last][top][bottom][index][help] */
3534               const char *reason)
3535 {
3536     if (history->execution_status != value) {
3537         history->execution_status = value;
3538         *why = reason;
3539     }
3540 }
3541 
3542 /*!
3543  * \internal
3544  * \brief Remap informational monitor results and operation status
3545  *
3546  * For the monitor results, certain OCF codes are for providing extended information
3547  * to the user about services that aren't yet failed but not entirely healthy either.
3548  * These must be treated as the "normal" result by Pacemaker.
3549  *
3550  * For operation status, the action result can be used to determine an appropriate
3551  * status for the purposes of responding to the action.  The status provided by the
3552  * executor is not directly usable since the executor does not know what was expected.
3553  *
3554  * \param[in,out] history  Parsed action history entry
3555  * \param[in,out] on_fail  What should be done about the result
3556  * \param[in]     expired  Whether result is expired
3557  *
3558  * \note If the result is remapped and the node is not shutting down or failed,
3559  *       the operation will be recorded in the data set's list of failed operations
3560  *       to highlight it for the user.
3561  *
3562  * \note This may update the resource's current and next role.
3563  */
3564 static void
3565 remap_operation(struct action_history *history,
     /* [previous][next][first][last][top][bottom][index][help] */
3566                 enum action_fail_response *on_fail, bool expired)
3567 {
3568     bool is_probe = false;
3569     int orig_exit_status = history->exit_status;
3570     int orig_exec_status = history->execution_status;
3571     const char *why = NULL;
3572     const char *task = history->task;
3573 
3574     // Remap degraded results to their successful counterparts
3575     history->exit_status = pcmk__effective_rc(history->exit_status);
3576     if (history->exit_status != orig_exit_status) {
3577         why = "degraded result";
3578         if (!expired && (!history->node->details->shutdown
3579                          || history->node->details->online)) {
3580             record_failed_op(history);
3581         }
3582     }
3583 
3584     if (!pe_rsc_is_bundled(history->rsc)
3585         && pcmk_xe_mask_probe_failure(history->xml)
3586         && ((history->execution_status != PCMK_EXEC_DONE)
3587             || (history->exit_status != PCMK_OCF_NOT_RUNNING))) {
3588         history->execution_status = PCMK_EXEC_DONE;
3589         history->exit_status = PCMK_OCF_NOT_RUNNING;
3590         why = "equivalent probe result";
3591     }
3592 
3593     /* If the executor reported an execution status of anything but done or
3594      * error, consider that final. But for done or error, we know better whether
3595      * it should be treated as a failure or not, because we know the expected
3596      * result.
3597      */
3598     switch (history->execution_status) {
3599         case PCMK_EXEC_DONE:
3600         case PCMK_EXEC_ERROR:
3601             break;
3602 
3603         // These should be treated as node-fatal
3604         case PCMK_EXEC_NO_FENCE_DEVICE:
3605         case PCMK_EXEC_NO_SECRETS:
3606             remap_because(history, &why, PCMK_EXEC_ERROR_HARD,
3607                           "node-fatal error");
3608             goto remap_done;
3609 
3610         default:
3611             goto remap_done;
3612     }
3613 
3614     is_probe = pcmk_xe_is_probe(history->xml);
3615     if (is_probe) {
3616         task = "probe";
3617     }
3618 
3619     if (history->expected_exit_status < 0) {
3620         /* Pre-1.0 Pacemaker versions, and Pacemaker 1.1.6 or earlier with
3621          * Heartbeat 2.0.7 or earlier as the cluster layer, did not include the
3622          * expected exit status in the transition key, which (along with the
3623          * similar case of a corrupted transition key in the CIB) will be
3624          * reported to this function as -1. Pacemaker 2.0+ does not support
3625          * rolling upgrades from those versions or processing of saved CIB files
3626          * from those versions, so we do not need to care much about this case.
3627          */
3628         remap_because(history, &why, PCMK_EXEC_ERROR,
3629                       "obsolete history format");
3630         crm_warn("Expected result not found for %s on %s "
3631                  "(corrupt or obsolete CIB?)",
3632                  history->key, pe__node_name(history->node));
3633 
3634     } else if (history->exit_status == history->expected_exit_status) {
3635         remap_because(history, &why, PCMK_EXEC_DONE, "expected result");
3636 
3637     } else {
3638         remap_because(history, &why, PCMK_EXEC_ERROR, "unexpected result");
3639         pe_rsc_debug(history->rsc,
3640                      "%s on %s: expected %d (%s), got %d (%s%s%s)",
3641                      history->key, pe__node_name(history->node),
3642                      history->expected_exit_status,
3643                      services_ocf_exitcode_str(history->expected_exit_status),
3644                      history->exit_status,
3645                      services_ocf_exitcode_str(history->exit_status),
3646                      (pcmk__str_empty(history->exit_reason)? "" : ": "),
3647                      pcmk__s(history->exit_reason, ""));
3648     }
3649 
3650     switch (history->exit_status) {
3651         case PCMK_OCF_OK:
3652             if (is_probe
3653                 && (history->expected_exit_status == PCMK_OCF_NOT_RUNNING)) {
3654                 char *last_change_s = last_change_str(history->xml);
3655 
3656                 remap_because(history, &why, PCMK_EXEC_DONE, "probe");
3657                 pe_rsc_info(history->rsc, "Probe found %s active on %s at %s",
3658                             history->rsc->id, pe__node_name(history->node),
3659                             last_change_s);
3660                 free(last_change_s);
3661             }
3662             break;
3663 
3664         case PCMK_OCF_NOT_RUNNING:
3665             if (is_probe
3666                 || (history->expected_exit_status == history->exit_status)
3667                 || !pcmk_is_set(history->rsc->flags, pe_rsc_managed)) {
3668 
3669                 /* For probes, recurring monitors for the Stopped role, and
3670                  * unmanaged resources, "not running" is not considered a
3671                  * failure.
3672                  */
3673                 remap_because(history, &why, PCMK_EXEC_DONE, "exit status");
3674                 history->rsc->role = RSC_ROLE_STOPPED;
3675                 *on_fail = action_fail_ignore;
3676                 pe__set_next_role(history->rsc, RSC_ROLE_UNKNOWN,
3677                                   "not running");
3678             }
3679             break;
3680 
3681         case PCMK_OCF_RUNNING_PROMOTED:
3682             if (is_probe
3683                 && (history->exit_status != history->expected_exit_status)) {
3684                 char *last_change_s = last_change_str(history->xml);
3685 
3686                 remap_because(history, &why, PCMK_EXEC_DONE, "probe");
3687                 pe_rsc_info(history->rsc,
3688                             "Probe found %s active and promoted on %s at %s",
3689                             history->rsc->id, pe__node_name(history->node),
3690                             last_change_s);
3691                 free(last_change_s);
3692             }
3693             if (!expired
3694                 || (history->exit_status == history->expected_exit_status)) {
3695                 history->rsc->role = RSC_ROLE_PROMOTED;
3696             }
3697             break;
3698 
3699         case PCMK_OCF_FAILED_PROMOTED:
3700             if (!expired) {
3701                 history->rsc->role = RSC_ROLE_PROMOTED;
3702             }
3703             remap_because(history, &why, PCMK_EXEC_ERROR, "exit status");
3704             break;
3705 
3706         case PCMK_OCF_NOT_CONFIGURED:
3707             remap_because(history, &why, PCMK_EXEC_ERROR_FATAL, "exit status");
3708             break;
3709 
3710         case PCMK_OCF_UNIMPLEMENT_FEATURE:
3711             {
3712                 guint interval_ms = 0;
3713                 crm_element_value_ms(history->xml, XML_LRM_ATTR_INTERVAL_MS,
3714                                      &interval_ms);
3715 
3716                 if (interval_ms == 0) {
3717                     if (!expired) {
3718                         block_if_unrecoverable(history);
3719                     }
3720                     remap_because(history, &why, PCMK_EXEC_ERROR_HARD,
3721                                   "exit status");
3722                 } else {
3723                     remap_because(history, &why, PCMK_EXEC_NOT_SUPPORTED,
3724                                   "exit status");
3725                 }
3726             }
3727             break;
3728 
3729         case PCMK_OCF_NOT_INSTALLED:
3730         case PCMK_OCF_INVALID_PARAM:
3731         case PCMK_OCF_INSUFFICIENT_PRIV:
3732             if (!expired) {
3733                 block_if_unrecoverable(history);
3734             }
3735             remap_because(history, &why, PCMK_EXEC_ERROR_HARD, "exit status");
3736             break;
3737 
3738         default:
3739             if (history->execution_status == PCMK_EXEC_DONE) {
3740                 char *last_change_s = last_change_str(history->xml);
3741 
3742                 crm_info("Treating unknown exit status %d from %s of %s "
3743                          "on %s at %s as failure",
3744                          history->exit_status, task, history->rsc->id,
3745                          pe__node_name(history->node), last_change_s);
3746                 remap_because(history, &why, PCMK_EXEC_ERROR,
3747                               "unknown exit status");
3748                 free(last_change_s);
3749             }
3750             break;
3751     }
3752 
3753 remap_done:
3754     if (why != NULL) {
3755         pe_rsc_trace(history->rsc,
3756                      "Remapped %s result from [%s: %s] to [%s: %s] "
3757                      "because of %s",
3758                      history->key, pcmk_exec_status_str(orig_exec_status),
3759                      crm_exit_str(orig_exit_status),
3760                      pcmk_exec_status_str(history->execution_status),
3761                      crm_exit_str(history->exit_status), why);
3762     }
3763 }
3764 
3765 // return TRUE if start or monitor last failure but parameters changed
3766 static bool
3767 should_clear_for_param_change(const xmlNode *xml_op, const char *task,
     /* [previous][next][first][last][top][bottom][index][help] */
3768                               pe_resource_t *rsc, pe_node_t *node)
3769 {
3770     if (!strcmp(task, "start") || !strcmp(task, "monitor")) {
3771 
3772         if (pe__bundle_needs_remote_name(rsc)) {
3773             /* We haven't allocated resources yet, so we can't reliably
3774              * substitute addr parameters for the REMOTE_CONTAINER_HACK.
3775              * When that's needed, defer the check until later.
3776              */
3777             pe__add_param_check(xml_op, rsc, node, pe_check_last_failure,
3778                                 rsc->cluster);
3779 
3780         } else {
3781             op_digest_cache_t *digest_data = NULL;
3782 
3783             digest_data = rsc_action_digest_cmp(rsc, xml_op, node,
3784                                                 rsc->cluster);
3785             switch (digest_data->rc) {
3786                 case RSC_DIGEST_UNKNOWN:
3787                     crm_trace("Resource %s history entry %s on %s"
3788                               " has no digest to compare",
3789                               rsc->id, pe__xe_history_key(xml_op),
3790                               node->details->id);
3791                     break;
3792                 case RSC_DIGEST_MATCH:
3793                     break;
3794                 default:
3795                     return TRUE;
3796             }
3797         }
3798     }
3799     return FALSE;
3800 }
3801 
3802 // Order action after fencing of remote node, given connection rsc
3803 static void
3804 order_after_remote_fencing(pe_action_t *action, pe_resource_t *remote_conn,
     /* [previous][next][first][last][top][bottom][index][help] */
3805                            pe_working_set_t *data_set)
3806 {
3807     pe_node_t *remote_node = pe_find_node(data_set->nodes, remote_conn->id);
3808 
3809     if (remote_node) {
3810         pe_action_t *fence = pe_fence_op(remote_node, NULL, TRUE, NULL,
3811                                          FALSE, data_set);
3812 
3813         order_actions(fence, action, pe_order_implies_then);
3814     }
3815 }
3816 
3817 static bool
3818 should_ignore_failure_timeout(const pe_resource_t *rsc, const char *task,
     /* [previous][next][first][last][top][bottom][index][help] */
3819                               guint interval_ms, bool is_last_failure)
3820 {
3821     /* Clearing failures of recurring monitors has special concerns. The
3822      * executor reports only changes in the monitor result, so if the
3823      * monitor is still active and still getting the same failure result,
3824      * that will go undetected after the failure is cleared.
3825      *
3826      * Also, the operation history will have the time when the recurring
3827      * monitor result changed to the given code, not the time when the
3828      * result last happened.
3829      *
3830      * @TODO We probably should clear such failures only when the failure
3831      * timeout has passed since the last occurrence of the failed result.
3832      * However we don't record that information. We could maybe approximate
3833      * that by clearing only if there is a more recent successful monitor or
3834      * stop result, but we don't even have that information at this point
3835      * since we are still unpacking the resource's operation history.
3836      *
3837      * This is especially important for remote connection resources with a
3838      * reconnect interval, so in that case, we skip clearing failures
3839      * if the remote node hasn't been fenced.
3840      */
3841     if (rsc->remote_reconnect_ms
3842         && pcmk_is_set(rsc->cluster->flags, pe_flag_stonith_enabled)
3843         && (interval_ms != 0) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3844 
3845         pe_node_t *remote_node = pe_find_node(rsc->cluster->nodes, rsc->id);
3846 
3847         if (remote_node && !remote_node->details->remote_was_fenced) {
3848             if (is_last_failure) {
3849                 crm_info("Waiting to clear monitor failure for remote node %s"
3850                          " until fencing has occurred", rsc->id);
3851             }
3852             return TRUE;
3853         }
3854     }
3855     return FALSE;
3856 }
3857 
3858 /*!
3859  * \internal
3860  * \brief Check operation age and schedule failure clearing when appropriate
3861  *
3862  * This function has two distinct purposes. The first is to check whether an
3863  * operation history entry is expired (i.e. the resource has a failure timeout,
3864  * the entry is older than the timeout, and the resource either has no fail
3865  * count or its fail count is entirely older than the timeout). The second is to
3866  * schedule fail count clearing when appropriate (i.e. the operation is expired
3867  * and either the resource has an expired fail count or the operation is a
3868  * last_failure for a remote connection resource with a reconnect interval,
3869  * or the operation is a last_failure for a start or monitor operation and the
3870  * resource's parameters have changed since the operation).
3871  *
3872  * \param[in,out] history  Parsed action result history
3873  *
3874  * \return true if operation history entry is expired, otherwise false
3875  */
3876 static bool
3877 check_operation_expiry(struct action_history *history)
     /* [previous][next][first][last][top][bottom][index][help] */
3878 {
3879     bool expired = false;
3880     bool is_last_failure = pcmk__ends_with(history->id, "_last_failure_0");
3881     time_t last_run = 0;
3882     int unexpired_fail_count = 0;
3883     const char *clear_reason = NULL;
3884 
3885     if (history->execution_status == PCMK_EXEC_NOT_INSTALLED) {
3886         pe_rsc_trace(history->rsc,
3887                      "Resource history entry %s on %s is not expired: "
3888                      "Not Installed does not expire",
3889                      history->id, pe__node_name(history->node));
3890         return false; // "Not installed" must always be cleared manually
3891     }
3892 
3893     if ((history->rsc->failure_timeout > 0)
3894         && (crm_element_value_epoch(history->xml, XML_RSC_OP_LAST_CHANGE,
3895                                     &last_run) == 0)) {
3896 
3897         // Resource has a failure-timeout, and history entry has a timestamp
3898 
3899         time_t now = get_effective_time(history->rsc->cluster);
3900         time_t last_failure = 0;
3901 
3902         // Is this particular operation history older than the failure timeout?
3903         if ((now >= (last_run + history->rsc->failure_timeout))
3904             && !should_ignore_failure_timeout(history->rsc, history->task,
3905                                               history->interval_ms,
3906                                               is_last_failure)) {
3907             expired = true;
3908         }
3909 
3910         // Does the resource as a whole have an unexpired fail count?
3911         unexpired_fail_count = pe_get_failcount(history->node, history->rsc,
3912                                                 &last_failure, pe_fc_effective,
3913                                                 history->xml);
3914 
3915         // Update scheduler recheck time according to *last* failure
3916         crm_trace("%s@%lld is %sexpired @%lld with unexpired_failures=%d timeout=%ds"
3917                   " last-failure@%lld",
3918                   history->id, (long long) last_run, (expired? "" : "not "),
3919                   (long long) now, unexpired_fail_count,
3920                   history->rsc->failure_timeout, (long long) last_failure);
3921         last_failure += history->rsc->failure_timeout + 1;
3922         if (unexpired_fail_count && (now < last_failure)) {
3923             pe__update_recheck_time(last_failure, history->rsc->cluster);
3924         }
3925     }
3926 
3927     if (expired) {
3928         if (pe_get_failcount(history->node, history->rsc, NULL, pe_fc_default,
3929                              history->xml)) {
3930             // There is a fail count ignoring timeout
3931 
3932             if (unexpired_fail_count == 0) {
3933                 // There is no fail count considering timeout
3934                 clear_reason = "it expired";
3935 
3936             } else {
3937                 /* This operation is old, but there is an unexpired fail count.
3938                  * In a properly functioning cluster, this should only be
3939                  * possible if this operation is not a failure (otherwise the
3940                  * fail count should be expired too), so this is really just a
3941                  * failsafe.
3942                  */
3943                 pe_rsc_trace(history->rsc,
3944                              "Resource history entry %s on %s is not expired: "
3945                              "Unexpired fail count",
3946                              history->id, pe__node_name(history->node));
3947                 expired = false;
3948             }
3949 
3950         } else if (is_last_failure
3951                    && (history->rsc->remote_reconnect_ms != 0)) {
3952             /* Clear any expired last failure when reconnect interval is set,
3953              * even if there is no fail count.
3954              */
3955             clear_reason = "reconnect interval is set";
3956         }
3957     }
3958 
3959     if (!expired && is_last_failure
3960         && should_clear_for_param_change(history->xml, history->task,
3961                                          history->rsc, history->node)) {
3962         clear_reason = "resource parameters have changed";
3963     }
3964 
3965     if (clear_reason != NULL) {
3966         // Schedule clearing of the fail count
3967         pe_action_t *clear_op = pe__clear_failcount(history->rsc, history->node,
3968                                                     clear_reason,
3969                                                     history->rsc->cluster);
3970 
3971         if (pcmk_is_set(history->rsc->cluster->flags, pe_flag_stonith_enabled)
3972             && (history->rsc->remote_reconnect_ms != 0)) {
3973             /* If we're clearing a remote connection due to a reconnect
3974              * interval, we want to wait until any scheduled fencing
3975              * completes.
3976              *
3977              * We could limit this to remote_node->details->unclean, but at
3978              * this point, that's always true (it won't be reliable until
3979              * after unpack_node_history() is done).
3980              */
3981             crm_info("Clearing %s failure will wait until any scheduled "
3982                      "fencing of %s completes",
3983                      history->task, history->rsc->id);
3984             order_after_remote_fencing(clear_op, history->rsc,
3985                                        history->rsc->cluster);
3986         }
3987     }
3988 
3989     if (expired && (history->interval_ms == 0)
3990         && pcmk__str_eq(history->task, CRMD_ACTION_STATUS, pcmk__str_none)) {
3991         switch (history->exit_status) {
3992             case PCMK_OCF_OK:
3993             case PCMK_OCF_NOT_RUNNING:
3994             case PCMK_OCF_RUNNING_PROMOTED:
3995             case PCMK_OCF_DEGRADED:
3996             case PCMK_OCF_DEGRADED_PROMOTED:
3997                 // Don't expire probes that return these values
3998                 pe_rsc_trace(history->rsc,
3999                              "Resource history entry %s on %s is not expired: "
4000                              "Probe result",
4001                              history->id, pe__node_name(history->node));
4002                 expired = false;
4003                 break;
4004         }
4005     }
4006 
4007     return expired;
4008 }
4009 
4010 int
4011 pe__target_rc_from_xml(const xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
4012 {
4013     int target_rc = 0;
4014     const char *key = crm_element_value(xml_op, XML_ATTR_TRANSITION_KEY);
4015 
4016     if (key == NULL) {
4017         return -1;
4018     }
4019     decode_transition_key(key, NULL, NULL, NULL, &target_rc);
4020     return target_rc;
4021 }
4022 
4023 /*!
4024  * \internal
4025  * \brief Get the failure handling for an action
4026  *
4027  * \param[in,out] history  Parsed action history entry
4028  *
4029  * \return Failure handling appropriate to action
4030  */
4031 static enum action_fail_response
4032 get_action_on_fail(struct action_history *history)
     /* [previous][next][first][last][top][bottom][index][help] */
4033 {
4034     enum action_fail_response result = action_fail_recover;
4035     pe_action_t *action = custom_action(history->rsc, strdup(history->key),
4036                                         history->task, NULL, TRUE, FALSE,
4037                                         history->rsc->cluster);
4038 
4039     result = action->on_fail;
4040     pe_free_action(action);
4041     return result;
4042 }
4043 
4044 /*!
4045  * \internal
4046  * \brief Update a resource's state for an action result
4047  *
4048  * \param[in,out] history       Parsed action history entry
4049  * \param[in]     exit_status   Exit status to base new state on
4050  * \param[in]     last_failure  Resource's last_failure entry, if known
4051  * \param[in,out] on_fail       Resource's current failure handling
4052  */
4053 static void
4054 update_resource_state(struct action_history *history, int exit_status,
     /* [previous][next][first][last][top][bottom][index][help] */
4055                       const xmlNode *last_failure,
4056                       enum action_fail_response *on_fail)
4057 {
4058     bool clear_past_failure = false;
4059 
4060     if ((exit_status == PCMK_OCF_NOT_INSTALLED)
4061         || (!pe_rsc_is_bundled(history->rsc)
4062             && pcmk_xe_mask_probe_failure(history->xml))) {
4063         history->rsc->role = RSC_ROLE_STOPPED;
4064 
4065     } else if (exit_status == PCMK_OCF_NOT_RUNNING) {
4066         clear_past_failure = true;
4067 
4068     } else if (pcmk__str_eq(history->task, CRMD_ACTION_STATUS,
4069                             pcmk__str_none)) {
4070         if ((last_failure != NULL)
4071             && pcmk__str_eq(history->key, pe__xe_history_key(last_failure),
4072                             pcmk__str_none)) {
4073             clear_past_failure = true;
4074         }
4075         if (history->rsc->role < RSC_ROLE_STARTED) {
4076             set_active(history->rsc);
4077         }
4078 
4079     } else if (pcmk__str_eq(history->task, CRMD_ACTION_START, pcmk__str_none)) {
4080         history->rsc->role = RSC_ROLE_STARTED;
4081         clear_past_failure = true;
4082 
4083     } else if (pcmk__str_eq(history->task, CRMD_ACTION_STOP, pcmk__str_none)) {
4084         history->rsc->role = RSC_ROLE_STOPPED;
4085         clear_past_failure = true;
4086 
4087     } else if (pcmk__str_eq(history->task, CRMD_ACTION_PROMOTE,
4088                             pcmk__str_none)) {
4089         history->rsc->role = RSC_ROLE_PROMOTED;
4090         clear_past_failure = true;
4091 
4092     } else if (pcmk__str_eq(history->task, CRMD_ACTION_DEMOTE,
4093                             pcmk__str_none)) {
4094         if (*on_fail == action_fail_demote) {
4095             // Demote clears an error only if on-fail=demote
4096             clear_past_failure = true;
4097         }
4098         history->rsc->role = RSC_ROLE_UNPROMOTED;
4099 
4100     } else if (pcmk__str_eq(history->task, CRMD_ACTION_MIGRATED,
4101                             pcmk__str_none)) {
4102         history->rsc->role = RSC_ROLE_STARTED;
4103         clear_past_failure = true;
4104 
4105     } else if (pcmk__str_eq(history->task, CRMD_ACTION_MIGRATE,
4106                             pcmk__str_none)) {
4107         unpack_migrate_to_success(history);
4108 
4109     } else if (history->rsc->role < RSC_ROLE_STARTED) {
4110         pe_rsc_trace(history->rsc, "%s active on %s",
4111                      history->rsc->id, pe__node_name(history->node));
4112         set_active(history->rsc);
4113     }
4114 
4115     if (!clear_past_failure) {
4116         return;
4117     }
4118 
4119     switch (*on_fail) {
4120         case action_fail_stop:
4121         case action_fail_fence:
4122         case action_fail_migrate:
4123         case action_fail_standby:
4124             pe_rsc_trace(history->rsc,
4125                          "%s (%s) is not cleared by a completed %s",
4126                          history->rsc->id, fail2text(*on_fail), history->task);
4127             break;
4128 
4129         case action_fail_block:
4130         case action_fail_ignore:
4131         case action_fail_demote:
4132         case action_fail_recover:
4133         case action_fail_restart_container:
4134             *on_fail = action_fail_ignore;
4135             pe__set_next_role(history->rsc, RSC_ROLE_UNKNOWN,
4136                               "clear past failures");
4137             break;
4138 
4139         case action_fail_reset_remote:
4140             if (history->rsc->remote_reconnect_ms == 0) {
4141                 /* With no reconnect interval, the connection is allowed to
4142                  * start again after the remote node is fenced and
4143                  * completely stopped. (With a reconnect interval, we wait
4144                  * for the failure to be cleared entirely before attempting
4145                  * to reconnect.)
4146                  */
4147                 *on_fail = action_fail_ignore;
4148                 pe__set_next_role(history->rsc, RSC_ROLE_UNKNOWN,
4149                                   "clear past failures and reset remote");
4150             }
4151             break;
4152     }
4153 }
4154 
4155 /*!
4156  * \internal
4157  * \brief Check whether a given history entry matters for resource state
4158  *
4159  * \param[in] history  Parsed action history entry
4160  *
4161  * \return true if action can affect resource state, otherwise false
4162  */
4163 static inline bool
4164 can_affect_state(struct action_history *history)
     /* [previous][next][first][last][top][bottom][index][help] */
4165 {
4166 #if 0
4167     /* @COMPAT It might be better to parse only actions we know we're interested
4168      * in, rather than exclude a couple we don't. However that would be a
4169      * behavioral change that should be done at a major or minor series release.
4170      * Currently, unknown operations can affect whether a resource is considered
4171      * active and/or failed.
4172      */
4173      return pcmk__str_any_of(history->task, CRMD_ACTION_STATUS,
4174                              CRMD_ACTION_START, CRMD_ACTION_STOP,
4175                              CRMD_ACTION_PROMOTE, CRMD_ACTION_DEMOTE,
4176                              CRMD_ACTION_MIGRATE, CRMD_ACTION_MIGRATED,
4177                              "asyncmon", NULL);
4178 #else
4179      return !pcmk__str_any_of(history->task, CRMD_ACTION_NOTIFY,
4180                               CRMD_ACTION_METADATA, NULL);
4181 #endif
4182 }
4183 
4184 /*!
4185  * \internal
4186  * \brief Unpack execution/exit status and exit reason from a history entry
4187  *
4188  * \param[in,out] history  Action history entry to unpack
4189  *
4190  * \return Standard Pacemaker return code
4191  */
4192 static int
4193 unpack_action_result(struct action_history *history)
     /* [previous][next][first][last][top][bottom][index][help] */
4194 {
4195     if ((crm_element_value_int(history->xml, XML_LRM_ATTR_OPSTATUS,
4196                                &(history->execution_status)) < 0)
4197         || (history->execution_status < PCMK_EXEC_PENDING)
4198         || (history->execution_status > PCMK_EXEC_MAX)
4199         || (history->execution_status == PCMK_EXEC_CANCELLED)) {
4200         crm_err("Ignoring resource history entry %s for %s on %s "
4201                 "with invalid " XML_LRM_ATTR_OPSTATUS " '%s'",
4202                 history->id, history->rsc->id, pe__node_name(history->node),
4203                 pcmk__s(crm_element_value(history->xml, XML_LRM_ATTR_OPSTATUS),
4204                         ""));
4205         return pcmk_rc_unpack_error;
4206     }
4207     if ((crm_element_value_int(history->xml, XML_LRM_ATTR_RC,
4208                                &(history->exit_status)) < 0)
4209         || (history->exit_status < 0) || (history->exit_status > CRM_EX_MAX)) {
4210 #if 0
4211         /* @COMPAT We should ignore malformed entries, but since that would
4212          * change behavior, it should be done at a major or minor series
4213          * release.
4214          */
4215         crm_err("Ignoring resource history entry %s for %s on %s "
4216                 "with invalid " XML_LRM_ATTR_RC " '%s'",
4217                 history->id, history->rsc->id, pe__node_name(history->node),
4218                 pcmk__s(crm_element_value(history->xml, XML_LRM_ATTR_RC),
4219                         ""));
4220         return pcmk_rc_unpack_error;
4221 #else
4222         history->exit_status = CRM_EX_ERROR;
4223 #endif
4224     }
4225     history->exit_reason = crm_element_value(history->xml,
4226                                              XML_LRM_ATTR_EXIT_REASON);
4227     return pcmk_rc_ok;
4228 }
4229 
4230 /*!
4231  * \internal
4232  * \brief Process an action history entry whose result expired
4233  *
4234  * \param[in,out] history           Parsed action history entry
4235  * \param[in]     orig_exit_status  Action exit status before remapping
4236  *
4237  * \return Standard Pacemaker return code (in particular, pcmk_rc_ok means the
4238  *         entry needs no further processing)
4239  */
4240 static int
4241 process_expired_result(struct action_history *history, int orig_exit_status)
     /* [previous][next][first][last][top][bottom][index][help] */
4242 {
4243     if (!pe_rsc_is_bundled(history->rsc)
4244         && pcmk_xe_mask_probe_failure(history->xml)
4245         && (orig_exit_status != history->expected_exit_status)) {
4246 
4247         if (history->rsc->role <= RSC_ROLE_STOPPED) {
4248             history->rsc->role = RSC_ROLE_UNKNOWN;
4249         }
4250         crm_trace("Ignoring resource history entry %s for probe of %s on %s: "
4251                   "Masked failure expired",
4252                   history->id, history->rsc->id,
4253                   pe__node_name(history->node));
4254         return pcmk_rc_ok;
4255     }
4256 
4257     if (history->exit_status == history->expected_exit_status) {
4258         return pcmk_rc_undetermined; // Only failures expire
4259     }
4260 
4261     if (history->interval_ms == 0) {
4262         crm_notice("Ignoring resource history entry %s for %s of %s on %s: "
4263                    "Expired failure",
4264                    history->id, history->task, history->rsc->id,
4265                    pe__node_name(history->node));
4266         return pcmk_rc_ok;
4267     }
4268 
4269     if (history->node->details->online && !history->node->details->unclean) {
4270         /* Reschedule the recurring action. schedule_cancel() won't work at
4271          * this stage, so as a hacky workaround, forcibly change the restart
4272          * digest so pcmk__check_action_config() does what we want later.
4273          *
4274          * @TODO We should skip this if there is a newer successful monitor.
4275          *       Also, this causes rescheduling only if the history entry
4276          *       has an op-digest (which the expire-non-blocked-failure
4277          *       scheduler regression test doesn't, but that may not be a
4278          *       realistic scenario in production).
4279          */
4280         crm_notice("Rescheduling %s-interval %s of %s on %s "
4281                    "after failure expired",
4282                    pcmk__readable_interval(history->interval_ms), history->task,
4283                    history->rsc->id, pe__node_name(history->node));
4284         crm_xml_add(history->xml, XML_LRM_ATTR_RESTART_DIGEST,
4285                     "calculated-failure-timeout");
4286         return pcmk_rc_ok;
4287     }
4288 
4289     return pcmk_rc_undetermined;
4290 }
4291 
4292 /*!
4293  * \internal
4294  * \brief Process a masked probe failure
4295  *
4296  * \param[in,out] history           Parsed action history entry
4297  * \param[in]     orig_exit_status  Action exit status before remapping
4298  * \param[in]     last_failure      Resource's last_failure entry, if known
4299  * \param[in,out] on_fail           Resource's current failure handling
4300  */
4301 static void
4302 mask_probe_failure(struct action_history *history, int orig_exit_status,
     /* [previous][next][first][last][top][bottom][index][help] */
4303                    const xmlNode *last_failure,
4304                    enum action_fail_response *on_fail)
4305 {
4306     pe_resource_t *ban_rsc = history->rsc;
4307 
4308     if (!pcmk_is_set(history->rsc->flags, pe_rsc_unique)) {
4309         ban_rsc = uber_parent(history->rsc);
4310     }
4311 
4312     crm_notice("Treating probe result '%s' for %s on %s as 'not running'",
4313                services_ocf_exitcode_str(orig_exit_status), history->rsc->id,
4314                pe__node_name(history->node));
4315     update_resource_state(history, history->expected_exit_status, last_failure,
4316                           on_fail);
4317     crm_xml_add(history->xml, XML_ATTR_UNAME, history->node->details->uname);
4318 
4319     record_failed_op(history);
4320     resource_location(ban_rsc, history->node, -INFINITY, "masked-probe-failure",
4321                       history->rsc->cluster);
4322 }
4323 
4324 /*!
4325  * \internal Check whether a given failure is for a given pending action
4326  *
4327  * \param[in] history       Parsed history entry for pending action
4328  * \param[in] last_failure  Resource's last_failure entry, if known
4329  *
4330  * \return true if \p last_failure is failure of pending action in \p history,
4331  *         otherwise false
4332  * \note Both \p history and \p last_failure must come from the same
4333  *       lrm_resource block, as node and resource are assumed to be the same.
4334  */
4335 static bool
4336 failure_is_newer(const struct action_history *history,
     /* [previous][next][first][last][top][bottom][index][help] */
4337                  const xmlNode *last_failure)
4338 {
4339     guint failure_interval_ms = 0U;
4340     long long failure_change = 0LL;
4341     long long this_change = 0LL;
4342 
4343     if (last_failure == NULL) {
4344         return false; // Resource has no last_failure entry
4345     }
4346 
4347     if (!pcmk__str_eq(history->task,
4348                       crm_element_value(last_failure, XML_LRM_ATTR_TASK),
4349                       pcmk__str_none)) {
4350         return false; // last_failure is for different action
4351     }
4352 
4353     if ((crm_element_value_ms(last_failure, XML_LRM_ATTR_INTERVAL_MS,
4354                               &failure_interval_ms) != pcmk_ok)
4355         || (history->interval_ms != failure_interval_ms)) {
4356         return false; // last_failure is for action with different interval
4357     }
4358 
4359     if ((pcmk__scan_ll(crm_element_value(history->xml, XML_RSC_OP_LAST_CHANGE),
4360                        &this_change, 0LL) != pcmk_rc_ok)
4361         || (pcmk__scan_ll(crm_element_value(last_failure,
4362                                             XML_RSC_OP_LAST_CHANGE),
4363                           &failure_change, 0LL) != pcmk_rc_ok)
4364         || (failure_change < this_change)) {
4365         return false; // Failure is not known to be newer
4366     }
4367 
4368     return true;
4369 }
4370 
4371 /*!
4372  * \internal
4373  * \brief Update a resource's role etc. for a pending action
4374  *
4375  * \param[in,out] history       Parsed history entry for pending action
4376  * \param[in]     last_failure  Resource's last_failure entry, if known
4377  */
4378 static void
4379 process_pending_action(struct action_history *history,
     /* [previous][next][first][last][top][bottom][index][help] */
4380                        const xmlNode *last_failure)
4381 {
4382     /* For recurring monitors, a failure is recorded only in RSC_last_failure_0,
4383      * and there might be a RSC_monitor_INTERVAL entry with the last successful
4384      * or pending result.
4385      *
4386      * If last_failure contains the failure of the pending recurring monitor
4387      * we're processing here, and is newer, the action is no longer pending.
4388      * (Pending results have call ID -1, which sorts last, so the last failure
4389      * if any should be known.)
4390      */
4391     if (failure_is_newer(history, last_failure)) {
4392         return;
4393     }
4394 
4395     if (strcmp(history->task, CRMD_ACTION_START) == 0) {
4396         pe__set_resource_flags(history->rsc, pe_rsc_start_pending);
4397         set_active(history->rsc);
4398 
4399     } else if (strcmp(history->task, CRMD_ACTION_PROMOTE) == 0) {
4400         history->rsc->role = RSC_ROLE_PROMOTED;
4401 
4402     } else if ((strcmp(history->task, CRMD_ACTION_MIGRATE) == 0)
4403                && history->node->details->unclean) {
4404         /* A migrate_to action is pending on a unclean source, so force a stop
4405          * on the target.
4406          */
4407         const char *migrate_target = NULL;
4408         pe_node_t *target = NULL;
4409 
4410         migrate_target = crm_element_value(history->xml,
4411                                            XML_LRM_ATTR_MIGRATE_TARGET);
4412         target = pe_find_node(history->rsc->cluster->nodes, migrate_target);
4413         if (target != NULL) {
4414             stop_action(history->rsc, target, FALSE);
4415         }
4416     }
4417 
4418     if (history->rsc->pending_task != NULL) {
4419         /* There should never be multiple pending actions, but as a failsafe,
4420          * just remember the first one processed for display purposes.
4421          */
4422         return;
4423     }
4424 
4425     if (pcmk_is_probe(history->task, history->interval_ms)) {
4426         /* Pending probes are currently never displayed, even if pending
4427          * operations are requested. If we ever want to change that,
4428          * enable the below and the corresponding part of
4429          * native.c:native_pending_task().
4430          */
4431 #if 0
4432         history->rsc->pending_task = strdup("probe");
4433         history->rsc->pending_node = history->node;
4434 #endif
4435     } else {
4436         history->rsc->pending_task = strdup(history->task);
4437         history->rsc->pending_node = history->node;
4438     }
4439 }
4440 
4441 static void
4442 unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
4443               xmlNode **last_failure, enum action_fail_response *on_fail)
4444 {
4445     int old_rc = 0;
4446     bool expired = false;
4447     pe_resource_t *parent = rsc;
4448     enum action_fail_response failure_strategy = action_fail_recover;
4449 
4450     struct action_history history = {
4451         .rsc = rsc,
4452         .node = node,
4453         .xml = xml_op,
4454         .execution_status = PCMK_EXEC_UNKNOWN,
4455     };
4456 
4457     CRM_CHECK(rsc && node && xml_op, return);
4458 
4459     history.id = ID(xml_op);
4460     if (history.id == NULL) {
4461         crm_err("Ignoring resource history entry for %s on %s without ID",
4462                 rsc->id, pe__node_name(node));
4463         return;
4464     }
4465 
4466     // Task and interval
4467     history.task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
4468     if (history.task == NULL) {
4469         crm_err("Ignoring resource history entry %s for %s on %s without "
4470                 XML_LRM_ATTR_TASK, history.id, rsc->id, pe__node_name(node));
4471         return;
4472     }
4473     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS,
4474                          &(history.interval_ms));
4475     if (!can_affect_state(&history)) {
4476         pe_rsc_trace(rsc,
4477                      "Ignoring resource history entry %s for %s on %s "
4478                      "with irrelevant action '%s'",
4479                      history.id, rsc->id, pe__node_name(node), history.task);
4480         return;
4481     }
4482 
4483     if (unpack_action_result(&history) != pcmk_rc_ok) {
4484         return; // Error already logged
4485     }
4486 
4487     history.expected_exit_status = pe__target_rc_from_xml(xml_op);
4488     history.key = pe__xe_history_key(xml_op);
4489     crm_element_value_int(xml_op, XML_LRM_ATTR_CALLID, &(history.call_id));
4490 
4491     pe_rsc_trace(rsc, "Unpacking %s (%s call %d on %s): %s (%s)",
4492                  history.id, history.task, history.call_id, pe__node_name(node),
4493                  pcmk_exec_status_str(history.execution_status),
4494                  crm_exit_str(history.exit_status));
4495 
4496     if (node->details->unclean) {
4497         pe_rsc_trace(rsc,
4498                      "%s is running on %s, which is unclean (further action "
4499                      "depends on value of stop's on-fail attribute)",
4500                      rsc->id, pe__node_name(node));
4501     }
4502 
4503     expired = check_operation_expiry(&history);
4504     old_rc = history.exit_status;
4505 
4506     remap_operation(&history, on_fail, expired);
4507 
4508     if (expired && (process_expired_result(&history, old_rc) == pcmk_rc_ok)) {
4509         goto done;
4510     }
4511 
4512     if (!pe_rsc_is_bundled(rsc) && pcmk_xe_mask_probe_failure(xml_op)) {
4513         mask_probe_failure(&history, old_rc, *last_failure, on_fail);
4514         goto done;
4515     }
4516 
4517     if (!pcmk_is_set(rsc->flags, pe_rsc_unique)) {
4518         parent = uber_parent(rsc);
4519     }
4520 
4521     switch (history.execution_status) {
4522         case PCMK_EXEC_PENDING:
4523             process_pending_action(&history, *last_failure);
4524             goto done;
4525 
4526         case PCMK_EXEC_DONE:
4527             update_resource_state(&history, history.exit_status, *last_failure,
4528                                   on_fail);
4529             goto done;
4530 
4531         case PCMK_EXEC_NOT_INSTALLED:
4532             failure_strategy = get_action_on_fail(&history);
4533             if (failure_strategy == action_fail_ignore) {
4534                 crm_warn("Cannot ignore failed %s of %s on %s: "
4535                          "Resource agent doesn't exist "
4536                          CRM_XS " status=%d rc=%d id=%s",
4537                          history.task, rsc->id, pe__node_name(node),
4538                          history.execution_status, history.exit_status,
4539                          history.id);
4540                 /* Also for printing it as "FAILED" by marking it as pe_rsc_failed later */
4541                 *on_fail = action_fail_migrate;
4542             }
4543             resource_location(parent, node, -INFINITY, "hard-error",
4544                               rsc->cluster);
4545             unpack_rsc_op_failure(&history, last_failure, on_fail);
4546             goto done;
4547 
4548         case PCMK_EXEC_NOT_CONNECTED:
4549             if (pe__is_guest_or_remote_node(node)
4550                 && pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_managed)) {
4551                 /* We should never get into a situation where a managed remote
4552                  * connection resource is considered OK but a resource action
4553                  * behind the connection gets a "not connected" status. But as a
4554                  * fail-safe in case a bug or unusual circumstances do lead to
4555                  * that, ensure the remote connection is considered failed.
4556                  */
4557                 pe__set_resource_flags(node->details->remote_rsc,
4558                                        pe_rsc_failed|pe_rsc_stop);
4559             }
4560             break; // Not done, do error handling
4561 
4562         case PCMK_EXEC_ERROR:
4563         case PCMK_EXEC_ERROR_HARD:
4564         case PCMK_EXEC_ERROR_FATAL:
4565         case PCMK_EXEC_TIMEOUT:
4566         case PCMK_EXEC_NOT_SUPPORTED:
4567         case PCMK_EXEC_INVALID:
4568             break; // Not done, do error handling
4569 
4570         default: // No other value should be possible at this point
4571             break;
4572     }
4573 
4574     failure_strategy = get_action_on_fail(&history);
4575     if ((failure_strategy == action_fail_ignore)
4576         || (failure_strategy == action_fail_restart_container
4577             && (strcmp(history.task, CRMD_ACTION_STOP) == 0))) {
4578 
4579         char *last_change_s = last_change_str(xml_op);
4580 
4581         crm_warn("Pretending failed %s (%s%s%s) of %s on %s at %s succeeded "
4582                  CRM_XS " %s",
4583                  history.task, services_ocf_exitcode_str(history.exit_status),
4584                  (pcmk__str_empty(history.exit_reason)? "" : ": "),
4585                  pcmk__s(history.exit_reason, ""), rsc->id, pe__node_name(node),
4586                  last_change_s, history.id);
4587         free(last_change_s);
4588 
4589         update_resource_state(&history, history.expected_exit_status,
4590                               *last_failure, on_fail);
4591         crm_xml_add(xml_op, XML_ATTR_UNAME, node->details->uname);
4592         pe__set_resource_flags(rsc, pe_rsc_failure_ignored);
4593 
4594         record_failed_op(&history);
4595 
4596         if ((failure_strategy == action_fail_restart_container)
4597             && cmp_on_fail(*on_fail, action_fail_recover) <= 0) {
4598             *on_fail = failure_strategy;
4599         }
4600 
4601     } else {
4602         unpack_rsc_op_failure(&history, last_failure, on_fail);
4603 
4604         if (history.execution_status == PCMK_EXEC_ERROR_HARD) {
4605             uint8_t log_level = LOG_ERR;
4606 
4607             if (history.exit_status == PCMK_OCF_NOT_INSTALLED) {
4608                 log_level = LOG_NOTICE;
4609             }
4610             do_crm_log(log_level,
4611                        "Preventing %s from restarting on %s because "
4612                        "of hard failure (%s%s%s) " CRM_XS " %s",
4613                        parent->id, pe__node_name(node),
4614                        services_ocf_exitcode_str(history.exit_status),
4615                        (pcmk__str_empty(history.exit_reason)? "" : ": "),
4616                        pcmk__s(history.exit_reason, ""), history.id);
4617             resource_location(parent, node, -INFINITY, "hard-error",
4618                               rsc->cluster);
4619 
4620         } else if (history.execution_status == PCMK_EXEC_ERROR_FATAL) {
4621             crm_err("Preventing %s from restarting anywhere because "
4622                     "of fatal failure (%s%s%s) " CRM_XS " %s",
4623                     parent->id, services_ocf_exitcode_str(history.exit_status),
4624                     (pcmk__str_empty(history.exit_reason)? "" : ": "),
4625                     pcmk__s(history.exit_reason, ""), history.id);
4626             resource_location(parent, NULL, -INFINITY, "fatal-error",
4627                               rsc->cluster);
4628         }
4629     }
4630 
4631 done:
4632     pe_rsc_trace(rsc, "%s role on %s after %s is %s (next %s)",
4633                  rsc->id, pe__node_name(node), history.id,
4634                  role2text(rsc->role), role2text(rsc->next_role));
4635 }
4636 
4637 static void
4638 add_node_attrs(const xmlNode *xml_obj, pe_node_t *node, bool overwrite,
     /* [previous][next][first][last][top][bottom][index][help] */
4639                pe_working_set_t *data_set)
4640 {
4641     const char *cluster_name = NULL;
4642 
4643     pe_rule_eval_data_t rule_data = {
4644         .node_hash = NULL,
4645         .role = RSC_ROLE_UNKNOWN,
4646         .now = data_set->now,
4647         .match_data = NULL,
4648         .rsc_data = NULL,
4649         .op_data = NULL
4650     };
4651 
4652     g_hash_table_insert(node->details->attrs,
4653                         strdup(CRM_ATTR_UNAME), strdup(node->details->uname));
4654 
4655     g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_ID),
4656                         strdup(node->details->id));
4657     if (pcmk__str_eq(node->details->id, data_set->dc_uuid, pcmk__str_casei)) {
4658         data_set->dc_node = node;
4659         node->details->is_dc = TRUE;
4660         g_hash_table_insert(node->details->attrs,
4661                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_TRUE));
4662     } else {
4663         g_hash_table_insert(node->details->attrs,
4664                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_FALSE));
4665     }
4666 
4667     cluster_name = g_hash_table_lookup(data_set->config_hash, "cluster-name");
4668     if (cluster_name) {
4669         g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_CLUSTER_NAME),
4670                             strdup(cluster_name));
4671     }
4672 
4673     pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_ATTR_SETS, &rule_data,
4674                                node->details->attrs, NULL, overwrite, data_set);
4675 
4676     pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_UTILIZATION, &rule_data,
4677                                node->details->utilization, NULL,
4678                                FALSE, data_set);
4679 
4680     if (pe_node_attribute_raw(node, CRM_ATTR_SITE_NAME) == NULL) {
4681         const char *site_name = pe_node_attribute_raw(node, "site-name");
4682 
4683         if (site_name) {
4684             g_hash_table_insert(node->details->attrs,
4685                                 strdup(CRM_ATTR_SITE_NAME),
4686                                 strdup(site_name));
4687 
4688         } else if (cluster_name) {
4689             /* Default to cluster-name if unset */
4690             g_hash_table_insert(node->details->attrs,
4691                                 strdup(CRM_ATTR_SITE_NAME),
4692                                 strdup(cluster_name));
4693         }
4694     }
4695 }
4696 
4697 static GList *
4698 extract_operations(const char *node, const char *rsc, xmlNode * rsc_entry, gboolean active_filter)
     /* [previous][next][first][last][top][bottom][index][help] */
4699 {
4700     int counter = -1;
4701     int stop_index = -1;
4702     int start_index = -1;
4703 
4704     xmlNode *rsc_op = NULL;
4705 
4706     GList *gIter = NULL;
4707     GList *op_list = NULL;
4708     GList *sorted_op_list = NULL;
4709 
4710     /* extract operations */
4711     op_list = NULL;
4712     sorted_op_list = NULL;
4713 
4714     for (rsc_op = pcmk__xe_first_child(rsc_entry);
4715          rsc_op != NULL; rsc_op = pcmk__xe_next(rsc_op)) {
4716 
4717         if (pcmk__str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP,
4718                          pcmk__str_none)) {
4719             crm_xml_add(rsc_op, "resource", rsc);
4720             crm_xml_add(rsc_op, XML_ATTR_UNAME, node);
4721             op_list = g_list_prepend(op_list, rsc_op);
4722         }
4723     }
4724 
4725     if (op_list == NULL) {
4726         /* if there are no operations, there is nothing to do */
4727         return NULL;
4728     }
4729 
4730     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
4731 
4732     /* create active recurring operations as optional */
4733     if (active_filter == FALSE) {
4734         return sorted_op_list;
4735     }
4736 
4737     op_list = NULL;
4738 
4739     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
4740 
4741     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
4742         xmlNode *rsc_op = (xmlNode *) gIter->data;
4743 
4744         counter++;
4745 
4746         if (start_index < stop_index) {
4747             crm_trace("Skipping %s: not active", ID(rsc_entry));
4748             break;
4749 
4750         } else if (counter < start_index) {
4751             crm_trace("Skipping %s: old", ID(rsc_op));
4752             continue;
4753         }
4754         op_list = g_list_append(op_list, rsc_op);
4755     }
4756 
4757     g_list_free(sorted_op_list);
4758     return op_list;
4759 }
4760 
4761 GList *
4762 find_operations(const char *rsc, const char *node, gboolean active_filter,
     /* [previous][next][first][last][top][bottom][index][help] */
4763                 pe_working_set_t * data_set)
4764 {
4765     GList *output = NULL;
4766     GList *intermediate = NULL;
4767 
4768     xmlNode *tmp = NULL;
4769     xmlNode *status = find_xml_node(data_set->input, XML_CIB_TAG_STATUS, TRUE);
4770 
4771     pe_node_t *this_node = NULL;
4772 
4773     xmlNode *node_state = NULL;
4774 
4775     for (node_state = pcmk__xe_first_child(status); node_state != NULL;
4776          node_state = pcmk__xe_next(node_state)) {
4777 
4778         if (pcmk__str_eq((const char *)node_state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
4779             const char *uname = crm_element_value(node_state, XML_ATTR_UNAME);
4780 
4781             if (node != NULL && !pcmk__str_eq(uname, node, pcmk__str_casei)) {
4782                 continue;
4783             }
4784 
4785             this_node = pe_find_node(data_set->nodes, uname);
4786             if(this_node == NULL) {
4787                 CRM_LOG_ASSERT(this_node != NULL);
4788                 continue;
4789 
4790             } else if (pe__is_guest_or_remote_node(this_node)) {
4791                 determine_remote_online_status(data_set, this_node);
4792 
4793             } else {
4794                 determine_online_status(node_state, this_node, data_set);
4795             }
4796 
4797             if (this_node->details->online
4798                 || pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
4799                 /* offline nodes run no resources...
4800                  * unless stonith is enabled in which case we need to
4801                  *   make sure rsc start events happen after the stonith
4802                  */
4803                 xmlNode *lrm_rsc = NULL;
4804 
4805                 tmp = find_xml_node(node_state, XML_CIB_TAG_LRM, FALSE);
4806                 tmp = find_xml_node(tmp, XML_LRM_TAG_RESOURCES, FALSE);
4807 
4808                 for (lrm_rsc = pcmk__xe_first_child(tmp); lrm_rsc != NULL;
4809                      lrm_rsc = pcmk__xe_next(lrm_rsc)) {
4810 
4811                     if (pcmk__str_eq((const char *)lrm_rsc->name,
4812                                      XML_LRM_TAG_RESOURCE, pcmk__str_none)) {
4813 
4814                         const char *rsc_id = crm_element_value(lrm_rsc, XML_ATTR_ID);
4815 
4816                         if (rsc != NULL && !pcmk__str_eq(rsc_id, rsc, pcmk__str_casei)) {
4817                             continue;
4818                         }
4819 
4820                         intermediate = extract_operations(uname, rsc_id, lrm_rsc, active_filter);
4821                         output = g_list_concat(output, intermediate);
4822                     }
4823                 }
4824             }
4825         }
4826     }
4827 
4828     return output;
4829 }

/* [previous][next][first][last][top][bottom][index][help] */