root/lib/pengine/unpack.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. is_dangling_guest_node
  2. pe_fence_node
  3. set_if_xpath
  4. unpack_config
  5. pe_create_node
  6. expand_remote_rsc_meta
  7. handle_startup_fencing
  8. unpack_nodes
  9. setup_container
  10. unpack_remote_nodes
  11. link_rsc2remotenode
  12. destroy_tag
  13. unpack_resources
  14. unpack_tags
  15. unpack_ticket_state
  16. unpack_tickets_state
  17. unpack_handle_remote_attrs
  18. unpack_transient_attributes
  19. unpack_node_state
  20. unpack_node_history
  21. unpack_status
  22. determine_online_status_no_fencing
  23. determine_online_status_fencing
  24. determine_remote_online_status
  25. determine_online_status
  26. pe_base_name_end
  27. clone_strip
  28. clone_zero
  29. create_fake_resource
  30. create_anonymous_orphan
  31. find_anonymous_clone
  32. unpack_find_resource
  33. process_orphan_resource
  34. process_rsc_state
  35. process_recurring
  36. calculate_active_ops
  37. unpack_shutdown_lock
  38. unpack_lrm_resource
  39. handle_orphaned_container_fillers
  40. unpack_node_lrm
  41. set_active
  42. set_node_score
  43. find_lrm_op
  44. find_lrm_resource
  45. unknown_on_node
  46. monitor_not_running_after
  47. non_monitor_after
  48. newer_state_after_migrate
  49. unpack_migrate_to_success
  50. unpack_migrate_to_failure
  51. unpack_migrate_from_failure
  52. record_failed_op
  53. get_op_key
  54. last_change_str
  55. cmp_on_fail
  56. unpack_rsc_op_failure
  57. check_recoverable
  58. remap_operation
  59. should_clear_for_param_change
  60. order_after_remote_fencing
  61. should_ignore_failure_timeout
  62. check_operation_expiry
  63. pe__target_rc_from_xml
  64. get_action_on_fail
  65. update_resource_state
  66. unpack_rsc_op
  67. add_node_attrs
  68. extract_operations
  69. find_operations

   1 /*
   2  * Copyright 2004-2022 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <stdio.h>
  13 #include <string.h>
  14 #include <glib.h>
  15 #include <time.h>
  16 
  17 #include <crm/crm.h>
  18 #include <crm/services.h>
  19 #include <crm/msg_xml.h>
  20 #include <crm/common/xml.h>
  21 #include <crm/common/xml_internal.h>
  22 
  23 #include <crm/common/util.h>
  24 #include <crm/pengine/rules.h>
  25 #include <crm/pengine/internal.h>
  26 #include <pe_status_private.h>
  27 
  28 CRM_TRACE_INIT_DATA(pe_status);
  29 
  30 /* This uses pcmk__set_flags_as()/pcmk__clear_flags_as() directly rather than
  31  * use pe__set_working_set_flags()/pe__clear_working_set_flags() so that the
  32  * flag is stringified more readably in log messages.
  33  */
  34 #define set_config_flag(data_set, option, flag) do {                        \
  35         const char *scf_value = pe_pref((data_set)->config_hash, (option)); \
  36         if (scf_value != NULL) {                                            \
  37             if (crm_is_true(scf_value)) {                                   \
  38                 (data_set)->flags = pcmk__set_flags_as(__func__, __LINE__,  \
  39                                     LOG_TRACE, "Working set",               \
  40                                     crm_system_name, (data_set)->flags,     \
  41                                     (flag), #flag);                         \
  42             } else {                                                        \
  43                 (data_set)->flags = pcmk__clear_flags_as(__func__, __LINE__,\
  44                                     LOG_TRACE, "Working set",               \
  45                                     crm_system_name, (data_set)->flags,     \
  46                                     (flag), #flag);                         \
  47             }                                                               \
  48         }                                                                   \
  49     } while(0)
  50 
  51 static void unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
  52                           xmlNode **last_failure,
  53                           enum action_fail_response *failed,
  54                           pe_working_set_t *data_set);
  55 static void determine_remote_online_status(pe_working_set_t *data_set,
  56                                            pe_node_t *this_node);
  57 static void add_node_attrs(xmlNode *attrs, pe_node_t *node, bool overwrite,
  58                            pe_working_set_t *data_set);
  59 static void determine_online_status(xmlNode *node_state, pe_node_t *this_node,
  60                                     pe_working_set_t *data_set);
  61 
  62 static void unpack_node_lrm(pe_node_t *node, xmlNode *xml,
  63                             pe_working_set_t *data_set);
  64 
  65 
  66 // Bitmask for warnings we only want to print once
  67 uint32_t pe_wo = 0;
  68 
  69 static gboolean
  70 is_dangling_guest_node(pe_node_t *node)
     /* [previous][next][first][last][top][bottom][index][help] */
  71 {
  72     /* we are looking for a remote-node that was supposed to be mapped to a
  73      * container resource, but all traces of that container have disappeared 
  74      * from both the config and the status section. */
  75     if (pe__is_guest_or_remote_node(node) &&
  76         node->details->remote_rsc &&
  77         node->details->remote_rsc->container == NULL &&
  78         pcmk_is_set(node->details->remote_rsc->flags,
  79                     pe_rsc_orphan_container_filler)) {
  80         return TRUE;
  81     }
  82 
  83     return FALSE;
  84 }
  85 
  86 /*!
  87  * \brief Schedule a fence action for a node
  88  *
  89  * \param[in,out] data_set  Current working set of cluster
  90  * \param[in,out] node      Node to fence
  91  * \param[in]     reason    Text description of why fencing is needed
  92  * \param[in]     priority_delay  Whether to consider `priority-fencing-delay`
  93  */
  94 void
  95 pe_fence_node(pe_working_set_t * data_set, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
  96               const char *reason, bool priority_delay)
  97 {
  98     CRM_CHECK(node, return);
  99 
 100     /* A guest node is fenced by marking its container as failed */
 101     if (pe__is_guest_node(node)) {
 102         pe_resource_t *rsc = node->details->remote_rsc->container;
 103 
 104         if (!pcmk_is_set(rsc->flags, pe_rsc_failed)) {
 105             if (!pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 106                 crm_notice("Not fencing guest node %s "
 107                            "(otherwise would because %s): "
 108                            "its guest resource %s is unmanaged",
 109                            pe__node_name(node), reason, rsc->id);
 110             } else {
 111                 crm_warn("Guest node %s will be fenced "
 112                          "(by recovering its guest resource %s): %s",
 113                          pe__node_name(node), rsc->id, reason);
 114 
 115                 /* We don't mark the node as unclean because that would prevent the
 116                  * node from running resources. We want to allow it to run resources
 117                  * in this transition if the recovery succeeds.
 118                  */
 119                 node->details->remote_requires_reset = TRUE;
 120                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
 121             }
 122         }
 123 
 124     } else if (is_dangling_guest_node(node)) {
 125         crm_info("Cleaning up dangling connection for guest node %s: "
 126                  "fencing was already done because %s, "
 127                  "and guest resource no longer exists",
 128                  pe__node_name(node), reason);
 129         pe__set_resource_flags(node->details->remote_rsc,
 130                                pe_rsc_failed|pe_rsc_stop);
 131 
 132     } else if (pe__is_remote_node(node)) {
 133         pe_resource_t *rsc = node->details->remote_rsc;
 134 
 135         if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 136             crm_notice("Not fencing remote node %s "
 137                        "(otherwise would because %s): connection is unmanaged",
 138                        pe__node_name(node), reason);
 139         } else if(node->details->remote_requires_reset == FALSE) {
 140             node->details->remote_requires_reset = TRUE;
 141             crm_warn("Remote node %s %s: %s",
 142                      pe__node_name(node),
 143                      pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 144                      reason);
 145         }
 146         node->details->unclean = TRUE;
 147         // No need to apply `priority-fencing-delay` for remote nodes
 148         pe_fence_op(node, NULL, TRUE, reason, FALSE, data_set);
 149 
 150     } else if (node->details->unclean) {
 151         crm_trace("Cluster node %s %s because %s",
 152                   pe__node_name(node),
 153                   pe_can_fence(data_set, node)? "would also be fenced" : "also is unclean",
 154                   reason);
 155 
 156     } else {
 157         crm_warn("Cluster node %s %s: %s",
 158                  pe__node_name(node),
 159                  pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 160                  reason);
 161         node->details->unclean = TRUE;
 162         pe_fence_op(node, NULL, TRUE, reason, priority_delay, data_set);
 163     }
 164 }
 165 
 166 // @TODO xpaths can't handle templates, rules, or id-refs
 167 
 168 // nvpair with provides or requires set to unfencing
 169 #define XPATH_UNFENCING_NVPAIR XML_CIB_TAG_NVPAIR                \
 170     "[(@" XML_NVPAIR_ATTR_NAME "='" PCMK_STONITH_PROVIDES "'"    \
 171     "or @" XML_NVPAIR_ATTR_NAME "='" XML_RSC_ATTR_REQUIRES "') " \
 172     "and @" XML_NVPAIR_ATTR_VALUE "='" PCMK__VALUE_UNFENCING "']"
 173 
 174 // unfencing in rsc_defaults or any resource
 175 #define XPATH_ENABLE_UNFENCING \
 176     "/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RESOURCES   \
 177     "//" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR                                               \
 178     "|/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RSCCONFIG  \
 179     "/" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR
 180 
 181 static void
 182 set_if_xpath(uint64_t flag, const char *xpath, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 183 {
 184     xmlXPathObjectPtr result = NULL;
 185 
 186     if (!pcmk_is_set(data_set->flags, flag)) {
 187         result = xpath_search(data_set->input, xpath);
 188         if (result && (numXpathResults(result) > 0)) {
 189             pe__set_working_set_flags(data_set, flag);
 190         }
 191         freeXpathObject(result);
 192     }
 193 }
 194 
 195 gboolean
 196 unpack_config(xmlNode * config, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 197 {
 198     const char *value = NULL;
 199     GHashTable *config_hash = pcmk__strkey_table(free, free);
 200 
 201     pe_rule_eval_data_t rule_data = {
 202         .node_hash = NULL,
 203         .role = RSC_ROLE_UNKNOWN,
 204         .now = data_set->now,
 205         .match_data = NULL,
 206         .rsc_data = NULL,
 207         .op_data = NULL
 208     };
 209 
 210     data_set->config_hash = config_hash;
 211 
 212     pe__unpack_dataset_nvpairs(config, XML_CIB_TAG_PROPSET, &rule_data, config_hash,
 213                                CIB_OPTIONS_FIRST, FALSE, data_set);
 214 
 215     verify_pe_options(data_set->config_hash);
 216 
 217     set_config_flag(data_set, "enable-startup-probes", pe_flag_startup_probes);
 218     if (!pcmk_is_set(data_set->flags, pe_flag_startup_probes)) {
 219         crm_info("Startup probes: disabled (dangerous)");
 220     }
 221 
 222     value = pe_pref(data_set->config_hash, XML_ATTR_HAVE_WATCHDOG);
 223     if (value && crm_is_true(value)) {
 224         crm_info("Watchdog-based self-fencing will be performed via SBD if "
 225                  "fencing is required and stonith-watchdog-timeout is nonzero");
 226         pe__set_working_set_flags(data_set, pe_flag_have_stonith_resource);
 227     }
 228 
 229     /* Set certain flags via xpath here, so they can be used before the relevant
 230      * configuration sections are unpacked.
 231      */
 232     set_if_xpath(pe_flag_enable_unfencing, XPATH_ENABLE_UNFENCING, data_set);
 233 
 234     value = pe_pref(data_set->config_hash, "stonith-timeout");
 235     data_set->stonith_timeout = (int) crm_parse_interval_spec(value);
 236     crm_debug("STONITH timeout: %d", data_set->stonith_timeout);
 237 
 238     set_config_flag(data_set, "stonith-enabled", pe_flag_stonith_enabled);
 239     crm_debug("STONITH of failed nodes is %s",
 240               pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)? "enabled" : "disabled");
 241 
 242     data_set->stonith_action = pe_pref(data_set->config_hash, "stonith-action");
 243     if (!strcmp(data_set->stonith_action, "poweroff")) {
 244         pe_warn_once(pe_wo_poweroff,
 245                      "Support for stonith-action of 'poweroff' is deprecated "
 246                      "and will be removed in a future release (use 'off' instead)");
 247         data_set->stonith_action = "off";
 248     }
 249     crm_trace("STONITH will %s nodes", data_set->stonith_action);
 250 
 251     set_config_flag(data_set, "concurrent-fencing", pe_flag_concurrent_fencing);
 252     crm_debug("Concurrent fencing is %s",
 253               pcmk_is_set(data_set->flags, pe_flag_concurrent_fencing)? "enabled" : "disabled");
 254 
 255     value = pe_pref(data_set->config_hash,
 256                     XML_CONFIG_ATTR_PRIORITY_FENCING_DELAY);
 257     if (value) {
 258         data_set->priority_fencing_delay = crm_parse_interval_spec(value) / 1000;
 259         crm_trace("Priority fencing delay is %ds", data_set->priority_fencing_delay);
 260     }
 261 
 262     set_config_flag(data_set, "stop-all-resources", pe_flag_stop_everything);
 263     crm_debug("Stop all active resources: %s",
 264               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_stop_everything)));
 265 
 266     set_config_flag(data_set, "symmetric-cluster", pe_flag_symmetric_cluster);
 267     if (pcmk_is_set(data_set->flags, pe_flag_symmetric_cluster)) {
 268         crm_debug("Cluster is symmetric" " - resources can run anywhere by default");
 269     }
 270 
 271     value = pe_pref(data_set->config_hash, "no-quorum-policy");
 272 
 273     if (pcmk__str_eq(value, "ignore", pcmk__str_casei)) {
 274         data_set->no_quorum_policy = no_quorum_ignore;
 275 
 276     } else if (pcmk__str_eq(value, "freeze", pcmk__str_casei)) {
 277         data_set->no_quorum_policy = no_quorum_freeze;
 278 
 279     } else if (pcmk__str_eq(value, "demote", pcmk__str_casei)) {
 280         data_set->no_quorum_policy = no_quorum_demote;
 281 
 282     } else if (pcmk__str_eq(value, "suicide", pcmk__str_casei)) {
 283         if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 284             int do_panic = 0;
 285 
 286             crm_element_value_int(data_set->input, XML_ATTR_QUORUM_PANIC,
 287                                   &do_panic);
 288             if (do_panic || pcmk_is_set(data_set->flags, pe_flag_have_quorum)) {
 289                 data_set->no_quorum_policy = no_quorum_suicide;
 290             } else {
 291                 crm_notice("Resetting no-quorum-policy to 'stop': cluster has never had quorum");
 292                 data_set->no_quorum_policy = no_quorum_stop;
 293             }
 294         } else {
 295             pcmk__config_err("Resetting no-quorum-policy to 'stop' because "
 296                              "fencing is disabled");
 297             data_set->no_quorum_policy = no_quorum_stop;
 298         }
 299 
 300     } else {
 301         data_set->no_quorum_policy = no_quorum_stop;
 302     }
 303 
 304     switch (data_set->no_quorum_policy) {
 305         case no_quorum_freeze:
 306             crm_debug("On loss of quorum: Freeze resources");
 307             break;
 308         case no_quorum_stop:
 309             crm_debug("On loss of quorum: Stop ALL resources");
 310             break;
 311         case no_quorum_demote:
 312             crm_debug("On loss of quorum: "
 313                       "Demote promotable resources and stop other resources");
 314             break;
 315         case no_quorum_suicide:
 316             crm_notice("On loss of quorum: Fence all remaining nodes");
 317             break;
 318         case no_quorum_ignore:
 319             crm_notice("On loss of quorum: Ignore");
 320             break;
 321     }
 322 
 323     set_config_flag(data_set, "stop-orphan-resources", pe_flag_stop_rsc_orphans);
 324     crm_trace("Orphan resources are %s",
 325               pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)? "stopped" : "ignored");
 326 
 327     set_config_flag(data_set, "stop-orphan-actions", pe_flag_stop_action_orphans);
 328     crm_trace("Orphan resource actions are %s",
 329               pcmk_is_set(data_set->flags, pe_flag_stop_action_orphans)? "stopped" : "ignored");
 330 
 331     value = pe_pref(data_set->config_hash, "remove-after-stop");
 332     if (value != NULL) {
 333         if (crm_is_true(value)) {
 334             pe__set_working_set_flags(data_set, pe_flag_remove_after_stop);
 335 #ifndef PCMK__COMPAT_2_0
 336             pe_warn_once(pe_wo_remove_after,
 337                          "Support for the remove-after-stop cluster property is"
 338                          " deprecated and will be removed in a future release");
 339 #endif
 340         } else {
 341             pe__clear_working_set_flags(data_set, pe_flag_remove_after_stop);
 342         }
 343     }
 344 
 345     set_config_flag(data_set, "maintenance-mode", pe_flag_maintenance_mode);
 346     crm_trace("Maintenance mode: %s",
 347               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_maintenance_mode)));
 348 
 349     set_config_flag(data_set, "start-failure-is-fatal", pe_flag_start_failure_fatal);
 350     crm_trace("Start failures are %s",
 351               pcmk_is_set(data_set->flags, pe_flag_start_failure_fatal)? "always fatal" : "handled by failcount");
 352 
 353     if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 354         set_config_flag(data_set, "startup-fencing", pe_flag_startup_fencing);
 355     }
 356     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 357         crm_trace("Unseen nodes will be fenced");
 358     } else {
 359         pe_warn_once(pe_wo_blind, "Blind faith: not fencing unseen nodes");
 360     }
 361 
 362     pe__unpack_node_health_scores(data_set);
 363 
 364     data_set->placement_strategy = pe_pref(data_set->config_hash, "placement-strategy");
 365     crm_trace("Placement strategy: %s", data_set->placement_strategy);
 366 
 367     set_config_flag(data_set, "shutdown-lock", pe_flag_shutdown_lock);
 368     crm_trace("Resources will%s be locked to cleanly shut down nodes",
 369               (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)? "" : " not"));
 370     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
 371         value = pe_pref(data_set->config_hash,
 372                         XML_CONFIG_ATTR_SHUTDOWN_LOCK_LIMIT);
 373         data_set->shutdown_lock = crm_parse_interval_spec(value) / 1000;
 374         crm_trace("Shutdown locks expire after %us", data_set->shutdown_lock);
 375     }
 376 
 377     return TRUE;
 378 }
 379 
 380 pe_node_t *
 381 pe_create_node(const char *id, const char *uname, const char *type,
     /* [previous][next][first][last][top][bottom][index][help] */
 382                const char *score, pe_working_set_t * data_set)
 383 {
 384     pe_node_t *new_node = NULL;
 385 
 386     if (pe_find_node(data_set->nodes, uname) != NULL) {
 387         pcmk__config_warn("More than one node entry has name '%s'", uname);
 388     }
 389 
 390     new_node = calloc(1, sizeof(pe_node_t));
 391     if (new_node == NULL) {
 392         return NULL;
 393     }
 394 
 395     new_node->weight = char2score(score);
 396     new_node->fixed = FALSE;
 397     new_node->details = calloc(1, sizeof(struct pe_node_shared_s));
 398 
 399     if (new_node->details == NULL) {
 400         free(new_node);
 401         return NULL;
 402     }
 403 
 404     crm_trace("Creating node for entry %s/%s", uname, id);
 405     new_node->details->id = id;
 406     new_node->details->uname = uname;
 407     new_node->details->online = FALSE;
 408     new_node->details->shutdown = FALSE;
 409     new_node->details->rsc_discovery_enabled = TRUE;
 410     new_node->details->running_rsc = NULL;
 411     new_node->details->data_set = data_set;
 412 
 413     if (pcmk__str_eq(type, "member", pcmk__str_null_matches | pcmk__str_casei)) {
 414         new_node->details->type = node_member;
 415 
 416     } else if (pcmk__str_eq(type, "remote", pcmk__str_casei)) {
 417         new_node->details->type = node_remote;
 418         pe__set_working_set_flags(data_set, pe_flag_have_remote_nodes);
 419 
 420     } else {
 421         /* @COMPAT 'ping' is the default for backward compatibility, but it
 422          * should be changed to 'member' at a compatibility break
 423          */
 424         if (!pcmk__str_eq(type, "ping", pcmk__str_casei)) {
 425             pcmk__config_warn("Node %s has unrecognized type '%s', "
 426                               "assuming 'ping'", pcmk__s(uname, "without name"),
 427                               type);
 428         }
 429         pe_warn_once(pe_wo_ping_node,
 430                      "Support for nodes of type 'ping' (such as %s) is "
 431                      "deprecated and will be removed in a future release",
 432                      pcmk__s(uname, "unnamed node"));
 433         new_node->details->type = node_ping;
 434     }
 435 
 436     new_node->details->attrs = pcmk__strkey_table(free, free);
 437 
 438     if (pe__is_guest_or_remote_node(new_node)) {
 439         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 440                             strdup("remote"));
 441     } else {
 442         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 443                             strdup("cluster"));
 444     }
 445 
 446     new_node->details->utilization = pcmk__strkey_table(free, free);
 447     new_node->details->digest_cache = pcmk__strkey_table(free,
 448                                                           pe__free_digests);
 449 
 450     data_set->nodes = g_list_insert_sorted(data_set->nodes, new_node,
 451                                            pe__cmp_node_name);
 452     return new_node;
 453 }
 454 
 455 static const char *
 456 expand_remote_rsc_meta(xmlNode *xml_obj, xmlNode *parent, pe_working_set_t *data)
     /* [previous][next][first][last][top][bottom][index][help] */
 457 {
 458     xmlNode *attr_set = NULL;
 459     xmlNode *attr = NULL;
 460 
 461     const char *container_id = ID(xml_obj);
 462     const char *remote_name = NULL;
 463     const char *remote_server = NULL;
 464     const char *remote_port = NULL;
 465     const char *connect_timeout = "60s";
 466     const char *remote_allow_migrate=NULL;
 467     const char *is_managed = NULL;
 468 
 469     for (attr_set = pcmk__xe_first_child(xml_obj); attr_set != NULL;
 470          attr_set = pcmk__xe_next(attr_set)) {
 471 
 472         if (!pcmk__str_eq((const char *)attr_set->name, XML_TAG_META_SETS,
 473                           pcmk__str_casei)) {
 474             continue;
 475         }
 476 
 477         for (attr = pcmk__xe_first_child(attr_set); attr != NULL;
 478              attr = pcmk__xe_next(attr)) {
 479             const char *value = crm_element_value(attr, XML_NVPAIR_ATTR_VALUE);
 480             const char *name = crm_element_value(attr, XML_NVPAIR_ATTR_NAME);
 481 
 482             if (pcmk__str_eq(name, XML_RSC_ATTR_REMOTE_NODE, pcmk__str_casei)) {
 483                 remote_name = value;
 484             } else if (pcmk__str_eq(name, "remote-addr", pcmk__str_casei)) {
 485                 remote_server = value;
 486             } else if (pcmk__str_eq(name, "remote-port", pcmk__str_casei)) {
 487                 remote_port = value;
 488             } else if (pcmk__str_eq(name, "remote-connect-timeout", pcmk__str_casei)) {
 489                 connect_timeout = value;
 490             } else if (pcmk__str_eq(name, "remote-allow-migrate", pcmk__str_casei)) {
 491                 remote_allow_migrate=value;
 492             } else if (pcmk__str_eq(name, XML_RSC_ATTR_MANAGED, pcmk__str_casei)) {
 493                 is_managed = value;
 494             }
 495         }
 496     }
 497 
 498     if (remote_name == NULL) {
 499         return NULL;
 500     }
 501 
 502     if (pe_find_resource(data->resources, remote_name) != NULL) {
 503         return NULL;
 504     }
 505 
 506     pe_create_remote_xml(parent, remote_name, container_id,
 507                          remote_allow_migrate, is_managed,
 508                          connect_timeout, remote_server, remote_port);
 509     return remote_name;
 510 }
 511 
 512 static void
 513 handle_startup_fencing(pe_working_set_t *data_set, pe_node_t *new_node)
     /* [previous][next][first][last][top][bottom][index][help] */
 514 {
 515     if ((new_node->details->type == node_remote) && (new_node->details->remote_rsc == NULL)) {
 516         /* Ignore fencing for remote nodes that don't have a connection resource
 517          * associated with them. This happens when remote node entries get left
 518          * in the nodes section after the connection resource is removed.
 519          */
 520         return;
 521     }
 522 
 523     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 524         // All nodes are unclean until we've seen their status entry
 525         new_node->details->unclean = TRUE;
 526 
 527     } else {
 528         // Blind faith ...
 529         new_node->details->unclean = FALSE;
 530     }
 531 
 532     /* We need to be able to determine if a node's status section
 533      * exists or not separate from whether the node is unclean. */
 534     new_node->details->unseen = TRUE;
 535 }
 536 
 537 gboolean
 538 unpack_nodes(xmlNode * xml_nodes, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 539 {
 540     xmlNode *xml_obj = NULL;
 541     pe_node_t *new_node = NULL;
 542     const char *id = NULL;
 543     const char *uname = NULL;
 544     const char *type = NULL;
 545     const char *score = NULL;
 546 
 547     pe_rule_eval_data_t rule_data = {
 548         .node_hash = NULL,
 549         .role = RSC_ROLE_UNKNOWN,
 550         .now = data_set->now,
 551         .match_data = NULL,
 552         .rsc_data = NULL,
 553         .op_data = NULL
 554     };
 555 
 556     for (xml_obj = pcmk__xe_first_child(xml_nodes); xml_obj != NULL;
 557          xml_obj = pcmk__xe_next(xml_obj)) {
 558 
 559         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_NODE, pcmk__str_none)) {
 560             new_node = NULL;
 561 
 562             id = crm_element_value(xml_obj, XML_ATTR_ID);
 563             uname = crm_element_value(xml_obj, XML_ATTR_UNAME);
 564             type = crm_element_value(xml_obj, XML_ATTR_TYPE);
 565             score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
 566             crm_trace("Processing node %s/%s", uname, id);
 567 
 568             if (id == NULL) {
 569                 pcmk__config_err("Ignoring <" XML_CIB_TAG_NODE
 570                                  "> entry in configuration without id");
 571                 continue;
 572             }
 573             new_node = pe_create_node(id, uname, type, score, data_set);
 574 
 575             if (new_node == NULL) {
 576                 return FALSE;
 577             }
 578 
 579             handle_startup_fencing(data_set, new_node);
 580 
 581             add_node_attrs(xml_obj, new_node, FALSE, data_set);
 582             pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_UTILIZATION, &rule_data,
 583                                        new_node->details->utilization, NULL,
 584                                        FALSE, data_set);
 585 
 586             crm_trace("Done with node %s", crm_element_value(xml_obj, XML_ATTR_UNAME));
 587         }
 588     }
 589 
 590     if (data_set->localhost && pe_find_node(data_set->nodes, data_set->localhost) == NULL) {
 591         crm_info("Creating a fake local node");
 592         pe_create_node(data_set->localhost, data_set->localhost, NULL, 0,
 593                        data_set);
 594     }
 595 
 596     return TRUE;
 597 }
 598 
 599 static void
 600 setup_container(pe_resource_t * rsc, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 601 {
 602     const char *container_id = NULL;
 603 
 604     if (rsc->children) {
 605         g_list_foreach(rsc->children, (GFunc) setup_container, data_set);
 606         return;
 607     }
 608 
 609     container_id = g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_CONTAINER);
 610     if (container_id && !pcmk__str_eq(container_id, rsc->id, pcmk__str_casei)) {
 611         pe_resource_t *container = pe_find_resource(data_set->resources, container_id);
 612 
 613         if (container) {
 614             rsc->container = container;
 615             pe__set_resource_flags(container, pe_rsc_is_container);
 616             container->fillers = g_list_append(container->fillers, rsc);
 617             pe_rsc_trace(rsc, "Resource %s's container is %s", rsc->id, container_id);
 618         } else {
 619             pe_err("Resource %s: Unknown resource container (%s)", rsc->id, container_id);
 620         }
 621     }
 622 }
 623 
 624 gboolean
 625 unpack_remote_nodes(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 626 {
 627     xmlNode *xml_obj = NULL;
 628 
 629     /* Create remote nodes and guest nodes from the resource configuration
 630      * before unpacking resources.
 631      */
 632     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 633          xml_obj = pcmk__xe_next(xml_obj)) {
 634 
 635         const char *new_node_id = NULL;
 636 
 637         /* Check for remote nodes, which are defined by ocf:pacemaker:remote
 638          * primitives.
 639          */
 640         if (xml_contains_remote_node(xml_obj)) {
 641             new_node_id = ID(xml_obj);
 642             /* The "pe_find_node" check is here to make sure we don't iterate over
 643              * an expanded node that has already been added to the node list. */
 644             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 645                 crm_trace("Found remote node %s defined by resource %s",
 646                           new_node_id, ID(xml_obj));
 647                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 648                                data_set);
 649             }
 650             continue;
 651         }
 652 
 653         /* Check for guest nodes, which are defined by special meta-attributes
 654          * of a primitive of any type (for example, VirtualDomain or Xen).
 655          */
 656         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_RESOURCE, pcmk__str_none)) {
 657             /* This will add an ocf:pacemaker:remote primitive to the
 658              * configuration for the guest node's connection, to be unpacked
 659              * later.
 660              */
 661             new_node_id = expand_remote_rsc_meta(xml_obj, xml_resources, data_set);
 662             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 663                 crm_trace("Found guest node %s in resource %s",
 664                           new_node_id, ID(xml_obj));
 665                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 666                                data_set);
 667             }
 668             continue;
 669         }
 670 
 671         /* Check for guest nodes inside a group. Clones are currently not
 672          * supported as guest nodes.
 673          */
 674         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_GROUP, pcmk__str_none)) {
 675             xmlNode *xml_obj2 = NULL;
 676             for (xml_obj2 = pcmk__xe_first_child(xml_obj); xml_obj2 != NULL;
 677                  xml_obj2 = pcmk__xe_next(xml_obj2)) {
 678 
 679                 new_node_id = expand_remote_rsc_meta(xml_obj2, xml_resources, data_set);
 680 
 681                 if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 682                     crm_trace("Found guest node %s in resource %s inside group %s",
 683                               new_node_id, ID(xml_obj2), ID(xml_obj));
 684                     pe_create_node(new_node_id, new_node_id, "remote", NULL,
 685                                    data_set);
 686                 }
 687             }
 688         }
 689     }
 690     return TRUE;
 691 }
 692 
 693 /* Call this after all the nodes and resources have been
 694  * unpacked, but before the status section is read.
 695  *
 696  * A remote node's online status is reflected by the state
 697  * of the remote node's connection resource. We need to link
 698  * the remote node to this connection resource so we can have
 699  * easy access to the connection resource during the scheduler calculations.
 700  */
 701 static void
 702 link_rsc2remotenode(pe_working_set_t *data_set, pe_resource_t *new_rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
 703 {
 704     pe_node_t *remote_node = NULL;
 705 
 706     if (new_rsc->is_remote_node == FALSE) {
 707         return;
 708     }
 709 
 710     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 711         /* remote_nodes and remote_resources are not linked in quick location calculations */
 712         return;
 713     }
 714 
 715     remote_node = pe_find_node(data_set->nodes, new_rsc->id);
 716     CRM_CHECK(remote_node != NULL, return);
 717 
 718     pe_rsc_trace(new_rsc, "Linking remote connection resource %s to %s",
 719                  new_rsc->id, pe__node_name(remote_node));
 720     remote_node->details->remote_rsc = new_rsc;
 721 
 722     if (new_rsc->container == NULL) {
 723         /* Handle start-up fencing for remote nodes (as opposed to guest nodes)
 724          * the same as is done for cluster nodes.
 725          */
 726         handle_startup_fencing(data_set, remote_node);
 727 
 728     } else {
 729         /* pe_create_node() marks the new node as "remote" or "cluster"; now
 730          * that we know the node is a guest node, update it correctly.
 731          */
 732         g_hash_table_replace(remote_node->details->attrs, strdup(CRM_ATTR_KIND),
 733                              strdup("container"));
 734     }
 735 }
 736 
 737 static void
 738 destroy_tag(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 739 {
 740     pe_tag_t *tag = data;
 741 
 742     if (tag) {
 743         free(tag->id);
 744         g_list_free_full(tag->refs, free);
 745         free(tag);
 746     }
 747 }
 748 
 749 /*!
 750  * \internal
 751  * \brief Parse configuration XML for resource information
 752  *
 753  * \param[in]     xml_resources  Top of resource configuration XML
 754  * \param[in,out] data_set       Where to put resource information
 755  *
 756  * \return TRUE
 757  *
 758  * \note unpack_remote_nodes() MUST be called before this, so that the nodes can
 759  *       be used when pe__unpack_resource() calls resource_location()
 760  */
 761 gboolean
 762 unpack_resources(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 763 {
 764     xmlNode *xml_obj = NULL;
 765     GList *gIter = NULL;
 766 
 767     data_set->template_rsc_sets = pcmk__strkey_table(free, destroy_tag);
 768 
 769     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 770          xml_obj = pcmk__xe_next(xml_obj)) {
 771 
 772         pe_resource_t *new_rsc = NULL;
 773         const char *id = ID(xml_obj);
 774 
 775         if (pcmk__str_empty(id)) {
 776             pcmk__config_err("Ignoring <%s> resource without ID",
 777                              crm_element_name(xml_obj));
 778             continue;
 779         }
 780 
 781         if (pcmk__str_eq((const char *) xml_obj->name, XML_CIB_TAG_RSC_TEMPLATE,
 782                          pcmk__str_none)) {
 783             if (g_hash_table_lookup_extended(data_set->template_rsc_sets, id,
 784                                              NULL, NULL) == FALSE) {
 785                 /* Record the template's ID for the knowledge of its existence anyway. */
 786                 g_hash_table_insert(data_set->template_rsc_sets, strdup(id), NULL);
 787             }
 788             continue;
 789         }
 790 
 791         crm_trace("Unpacking <%s id='%s'>", crm_element_name(xml_obj), id);
 792         if (pe__unpack_resource(xml_obj, &new_rsc, NULL,
 793                                 data_set) == pcmk_rc_ok) {
 794             data_set->resources = g_list_append(data_set->resources, new_rsc);
 795             pe_rsc_trace(new_rsc, "Added resource %s", new_rsc->id);
 796 
 797         } else {
 798             pcmk__config_err("Ignoring <%s> resource '%s' "
 799                              "because configuration is invalid",
 800                              crm_element_name(xml_obj), id);
 801         }
 802     }
 803 
 804     for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) {
 805         pe_resource_t *rsc = (pe_resource_t *) gIter->data;
 806 
 807         setup_container(rsc, data_set);
 808         link_rsc2remotenode(data_set, rsc);
 809     }
 810 
 811     data_set->resources = g_list_sort(data_set->resources,
 812                                       pe__cmp_rsc_priority);
 813     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 814         /* Ignore */
 815 
 816     } else if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
 817                && !pcmk_is_set(data_set->flags, pe_flag_have_stonith_resource)) {
 818 
 819         pcmk__config_err("Resource start-up disabled since no STONITH resources have been defined");
 820         pcmk__config_err("Either configure some or disable STONITH with the stonith-enabled option");
 821         pcmk__config_err("NOTE: Clusters with shared data need STONITH to ensure data integrity");
 822     }
 823 
 824     return TRUE;
 825 }
 826 
 827 gboolean
 828 unpack_tags(xmlNode * xml_tags, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 829 {
 830     xmlNode *xml_tag = NULL;
 831 
 832     data_set->tags = pcmk__strkey_table(free, destroy_tag);
 833 
 834     for (xml_tag = pcmk__xe_first_child(xml_tags); xml_tag != NULL;
 835          xml_tag = pcmk__xe_next(xml_tag)) {
 836 
 837         xmlNode *xml_obj_ref = NULL;
 838         const char *tag_id = ID(xml_tag);
 839 
 840         if (!pcmk__str_eq((const char *)xml_tag->name, XML_CIB_TAG_TAG, pcmk__str_none)) {
 841             continue;
 842         }
 843 
 844         if (tag_id == NULL) {
 845             pcmk__config_err("Ignoring <%s> without " XML_ATTR_ID,
 846                              crm_element_name(xml_tag));
 847             continue;
 848         }
 849 
 850         for (xml_obj_ref = pcmk__xe_first_child(xml_tag); xml_obj_ref != NULL;
 851              xml_obj_ref = pcmk__xe_next(xml_obj_ref)) {
 852 
 853             const char *obj_ref = ID(xml_obj_ref);
 854 
 855             if (!pcmk__str_eq((const char *)xml_obj_ref->name, XML_CIB_TAG_OBJ_REF, pcmk__str_none)) {
 856                 continue;
 857             }
 858 
 859             if (obj_ref == NULL) {
 860                 pcmk__config_err("Ignoring <%s> for tag '%s' without " XML_ATTR_ID,
 861                                  crm_element_name(xml_obj_ref), tag_id);
 862                 continue;
 863             }
 864 
 865             if (add_tag_ref(data_set->tags, tag_id, obj_ref) == FALSE) {
 866                 return FALSE;
 867             }
 868         }
 869     }
 870 
 871     return TRUE;
 872 }
 873 
 874 /* The ticket state section:
 875  * "/cib/status/tickets/ticket_state" */
 876 static gboolean
 877 unpack_ticket_state(xmlNode * xml_ticket, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 878 {
 879     const char *ticket_id = NULL;
 880     const char *granted = NULL;
 881     const char *last_granted = NULL;
 882     const char *standby = NULL;
 883     xmlAttrPtr xIter = NULL;
 884 
 885     pe_ticket_t *ticket = NULL;
 886 
 887     ticket_id = ID(xml_ticket);
 888     if (pcmk__str_empty(ticket_id)) {
 889         return FALSE;
 890     }
 891 
 892     crm_trace("Processing ticket state for %s", ticket_id);
 893 
 894     ticket = g_hash_table_lookup(data_set->tickets, ticket_id);
 895     if (ticket == NULL) {
 896         ticket = ticket_new(ticket_id, data_set);
 897         if (ticket == NULL) {
 898             return FALSE;
 899         }
 900     }
 901 
 902     for (xIter = xml_ticket->properties; xIter; xIter = xIter->next) {
 903         const char *prop_name = (const char *)xIter->name;
 904         const char *prop_value = crm_element_value(xml_ticket, prop_name);
 905 
 906         if (pcmk__str_eq(prop_name, XML_ATTR_ID, pcmk__str_none)) {
 907             continue;
 908         }
 909         g_hash_table_replace(ticket->state, strdup(prop_name), strdup(prop_value));
 910     }
 911 
 912     granted = g_hash_table_lookup(ticket->state, "granted");
 913     if (granted && crm_is_true(granted)) {
 914         ticket->granted = TRUE;
 915         crm_info("We have ticket '%s'", ticket->id);
 916     } else {
 917         ticket->granted = FALSE;
 918         crm_info("We do not have ticket '%s'", ticket->id);
 919     }
 920 
 921     last_granted = g_hash_table_lookup(ticket->state, "last-granted");
 922     if (last_granted) {
 923         long long last_granted_ll;
 924 
 925         pcmk__scan_ll(last_granted, &last_granted_ll, 0LL);
 926         ticket->last_granted = (time_t) last_granted_ll;
 927     }
 928 
 929     standby = g_hash_table_lookup(ticket->state, "standby");
 930     if (standby && crm_is_true(standby)) {
 931         ticket->standby = TRUE;
 932         if (ticket->granted) {
 933             crm_info("Granted ticket '%s' is in standby-mode", ticket->id);
 934         }
 935     } else {
 936         ticket->standby = FALSE;
 937     }
 938 
 939     crm_trace("Done with ticket state for %s", ticket_id);
 940 
 941     return TRUE;
 942 }
 943 
 944 static gboolean
 945 unpack_tickets_state(xmlNode * xml_tickets, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 946 {
 947     xmlNode *xml_obj = NULL;
 948 
 949     for (xml_obj = pcmk__xe_first_child(xml_tickets); xml_obj != NULL;
 950          xml_obj = pcmk__xe_next(xml_obj)) {
 951 
 952         if (!pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_TICKET_STATE, pcmk__str_none)) {
 953             continue;
 954         }
 955         unpack_ticket_state(xml_obj, data_set);
 956     }
 957 
 958     return TRUE;
 959 }
 960 
 961 static void
 962 unpack_handle_remote_attrs(pe_node_t *this_node, xmlNode *state, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
 963 {
 964     const char *resource_discovery_enabled = NULL;
 965     xmlNode *attrs = NULL;
 966     pe_resource_t *rsc = NULL;
 967 
 968     if (!pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
 969         return;
 970     }
 971 
 972     if ((this_node == NULL) || !pe__is_guest_or_remote_node(this_node)) {
 973         return;
 974     }
 975     crm_trace("Processing Pacemaker Remote node %s", pe__node_name(this_node));
 976 
 977     pcmk__scan_min_int(crm_element_value(state, XML_NODE_IS_MAINTENANCE),
 978                        &(this_node->details->remote_maintenance), 0);
 979 
 980     rsc = this_node->details->remote_rsc;
 981     if (this_node->details->remote_requires_reset == FALSE) {
 982         this_node->details->unclean = FALSE;
 983         this_node->details->unseen = FALSE;
 984     }
 985     attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
 986     add_node_attrs(attrs, this_node, TRUE, data_set);
 987 
 988     if (pe__shutdown_requested(this_node)) {
 989         crm_info("%s is shutting down", pe__node_name(this_node));
 990         this_node->details->shutdown = TRUE;
 991     }
 992  
 993     if (crm_is_true(pe_node_attribute_raw(this_node, "standby"))) {
 994         crm_info("%s is in standby mode", pe__node_name(this_node));
 995         this_node->details->standby = TRUE;
 996     }
 997 
 998     if (crm_is_true(pe_node_attribute_raw(this_node, "maintenance")) ||
 999         ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed))) {
1000         crm_info("%s is in maintenance mode", pe__node_name(this_node));
1001         this_node->details->maintenance = TRUE;
1002     }
1003 
1004     resource_discovery_enabled = pe_node_attribute_raw(this_node, XML_NODE_ATTR_RSC_DISCOVERY);
1005     if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
1006         if (pe__is_remote_node(this_node)
1007             && !pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1008             crm_warn("Ignoring " XML_NODE_ATTR_RSC_DISCOVERY
1009                      " attribute on Pacemaker Remote node %s"
1010                      " because fencing is disabled",
1011                      pe__node_name(this_node));
1012         } else {
1013             /* This is either a remote node with fencing enabled, or a guest
1014              * node. We don't care whether fencing is enabled when fencing guest
1015              * nodes, because they are "fenced" by recovering their containing
1016              * resource.
1017              */
1018             crm_info("%s has resource discovery disabled",
1019                      pe__node_name(this_node));
1020             this_node->details->rsc_discovery_enabled = FALSE;
1021         }
1022     }
1023 }
1024 
1025 /*!
1026  * \internal
1027  * \brief Unpack a cluster node's transient attributes
1028  *
1029  * \param[in] state     CIB node state XML
1030  * \param[in] node      Cluster node whose attributes are being unpacked
1031  * \param[in] data_set  Cluster working set
1032  */
1033 static void
1034 unpack_transient_attributes(xmlNode *state, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1035                             pe_working_set_t *data_set)
1036 {
1037     const char *discovery = NULL;
1038     xmlNode *attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
1039 
1040     add_node_attrs(attrs, node, TRUE, data_set);
1041 
1042     if (crm_is_true(pe_node_attribute_raw(node, "standby"))) {
1043         crm_info("%s is in standby mode", pe__node_name(node));
1044         node->details->standby = TRUE;
1045     }
1046 
1047     if (crm_is_true(pe_node_attribute_raw(node, "maintenance"))) {
1048         crm_info("%s is in maintenance mode", pe__node_name(node));
1049         node->details->maintenance = TRUE;
1050     }
1051 
1052     discovery = pe_node_attribute_raw(node, XML_NODE_ATTR_RSC_DISCOVERY);
1053     if ((discovery != NULL) && !crm_is_true(discovery)) {
1054         crm_warn("Ignoring " XML_NODE_ATTR_RSC_DISCOVERY
1055                  " attribute for %s because disabling resource discovery "
1056                  "is not allowed for cluster nodes", pe__node_name(node));
1057     }
1058 }
1059 
1060 /*!
1061  * \internal
1062  * \brief Unpack a node state entry (first pass)
1063  *
1064  * Unpack one node state entry from status. This unpacks information from the
1065  * node_state element itself and node attributes inside it, but not the
1066  * resource history inside it. Multiple passes through the status are needed to
1067  * fully unpack everything.
1068  *
1069  * \param[in] state     CIB node state XML
1070  * \param[in] data_set  Cluster working set
1071  */
1072 static void
1073 unpack_node_state(xmlNode *state, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1074 {
1075     const char *id = NULL;
1076     const char *uname = NULL;
1077     pe_node_t *this_node = NULL;
1078 
1079     id = crm_element_value(state, XML_ATTR_ID);
1080     if (id == NULL) {
1081         crm_warn("Ignoring malformed " XML_CIB_TAG_STATE " entry without "
1082                  XML_ATTR_ID);
1083         return;
1084     }
1085 
1086     uname = crm_element_value(state, XML_ATTR_UNAME);
1087     if (uname == NULL) {
1088         crm_warn("Ignoring malformed " XML_CIB_TAG_STATE " entry without "
1089                  XML_ATTR_UNAME);
1090         return;
1091     }
1092 
1093     this_node = pe_find_node_any(data_set->nodes, id, uname);
1094     if (this_node == NULL) {
1095         pcmk__config_warn("Ignoring recorded node state for '%s' because "
1096                           "it is no longer in the configuration", uname);
1097         return;
1098     }
1099 
1100     if (pe__is_guest_or_remote_node(this_node)) {
1101         /* We can't determine the online status of Pacemaker Remote nodes until
1102          * after all resource history has been unpacked. In this first pass, we
1103          * do need to mark whether the node has been fenced, as this plays a
1104          * role during unpacking cluster node resource state.
1105          */
1106         pcmk__scan_min_int(crm_element_value(state, XML_NODE_IS_FENCED),
1107                            &(this_node->details->remote_was_fenced), 0);
1108         return;
1109     }
1110 
1111     unpack_transient_attributes(state, this_node, data_set);
1112 
1113     /* Provisionally mark this cluster node as clean. We have at least seen it
1114      * in the current cluster's lifetime.
1115      */
1116     this_node->details->unclean = FALSE;
1117     this_node->details->unseen = FALSE;
1118 
1119     crm_trace("Determining online status of cluster node %s (id %s)",
1120               pe__node_name(this_node), id);
1121     determine_online_status(state, this_node, data_set);
1122 
1123     if (!pcmk_is_set(data_set->flags, pe_flag_have_quorum)
1124         && this_node->details->online
1125         && (data_set->no_quorum_policy == no_quorum_suicide)) {
1126         /* Everything else should flow from this automatically
1127          * (at least until the scheduler becomes able to migrate off
1128          * healthy resources)
1129          */
1130         pe_fence_node(data_set, this_node, "cluster does not have quorum",
1131                       FALSE);
1132     }
1133 }
1134 
1135 /*!
1136  * \internal
1137  * \brief Unpack nodes' resource history as much as possible
1138  *
1139  * Unpack as many nodes' resource history as possible in one pass through the
1140  * status. We need to process Pacemaker Remote nodes' connections/containers
1141  * before unpacking their history; the connection/container history will be
1142  * in another node's history, so it might take multiple passes to unpack
1143  * everything.
1144  *
1145  * \param[in] status    CIB XML status section
1146  * \param[in] fence     If true, treat any not-yet-unpacked nodes as unseen
1147  * \param[in] data_set  Cluster working set
1148  *
1149  * \return Standard Pacemaker return code (specifically pcmk_rc_ok if done,
1150  *         or EAGAIN if more unpacking remains to be done)
1151  */
1152 static int
1153 unpack_node_history(xmlNode *status, bool fence, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1154 {
1155     int rc = pcmk_rc_ok;
1156 
1157     // Loop through all node_state entries in CIB status
1158     for (xmlNode *state = first_named_child(status, XML_CIB_TAG_STATE);
1159          state != NULL; state = crm_next_same_xml(state)) {
1160 
1161         const char *id = ID(state);
1162         const char *uname = crm_element_value(state, XML_ATTR_UNAME);
1163         pe_node_t *this_node = NULL;
1164 
1165         if ((id == NULL) || (uname == NULL)) {
1166             // Warning already logged in first pass through status section
1167             crm_trace("Not unpacking resource history from malformed "
1168                       XML_CIB_TAG_STATE " without id and/or uname");
1169             continue;
1170         }
1171 
1172         this_node = pe_find_node_any(data_set->nodes, id, uname);
1173         if (this_node == NULL) {
1174             // Warning already logged in first pass through status section
1175             crm_trace("Not unpacking resource history for node %s because "
1176                       "no longer in configuration", id);
1177             continue;
1178         }
1179 
1180         if (this_node->details->unpacked) {
1181             crm_trace("Not unpacking resource history for node %s because "
1182                       "already unpacked", id);
1183             continue;
1184         }
1185 
1186         if (fence) {
1187             // We're processing all remaining nodes
1188 
1189         } else if (pe__is_guest_node(this_node)) {
1190             /* We can unpack a guest node's history only after we've unpacked
1191              * other resource history to the point that we know that the node's
1192              * connection and containing resource are both up.
1193              */
1194             pe_resource_t *rsc = this_node->details->remote_rsc;
1195 
1196             if ((rsc == NULL) || (rsc->role != RSC_ROLE_STARTED)
1197                 || (rsc->container->role != RSC_ROLE_STARTED)) {
1198                 crm_trace("Not unpacking resource history for guest node %s "
1199                           "because container and connection are not known to "
1200                           "be up", id);
1201                 continue;
1202             }
1203 
1204         } else if (pe__is_remote_node(this_node)) {
1205             /* We can unpack a remote node's history only after we've unpacked
1206              * other resource history to the point that we know that the node's
1207              * connection is up, with the exception of when shutdown locks are
1208              * in use.
1209              */
1210             pe_resource_t *rsc = this_node->details->remote_rsc;
1211 
1212             if ((rsc == NULL)
1213                 || (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)
1214                     && (rsc->role != RSC_ROLE_STARTED))) {
1215                 crm_trace("Not unpacking resource history for remote node %s "
1216                           "because connection is not known to be up", id);
1217                 continue;
1218             }
1219 
1220         /* If fencing and shutdown locks are disabled and we're not processing
1221          * unseen nodes, then we don't want to unpack offline nodes until online
1222          * nodes have been unpacked. This allows us to number active clone
1223          * instances first.
1224          */
1225         } else if (!pcmk_any_flags_set(data_set->flags, pe_flag_stonith_enabled
1226                                                         |pe_flag_shutdown_lock)
1227                    && !this_node->details->online) {
1228             crm_trace("Not unpacking resource history for offline "
1229                       "cluster node %s", id);
1230             continue;
1231         }
1232 
1233         if (pe__is_guest_or_remote_node(this_node)) {
1234             determine_remote_online_status(data_set, this_node);
1235             unpack_handle_remote_attrs(this_node, state, data_set);
1236         }
1237 
1238         crm_trace("Unpacking resource history for %snode %s",
1239                   (fence? "unseen " : ""), id);
1240 
1241         this_node->details->unpacked = TRUE;
1242         unpack_node_lrm(this_node, state, data_set);
1243 
1244         rc = EAGAIN; // Other node histories might depend on this one
1245     }
1246     return rc;
1247 }
1248 
1249 /* remove nodes that are down, stopping */
1250 /* create positive rsc_to_node constraints between resources and the nodes they are running on */
1251 /* anything else? */
1252 gboolean
1253 unpack_status(xmlNode * status, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1254 {
1255     xmlNode *state = NULL;
1256 
1257     crm_trace("Beginning unpack");
1258 
1259     if (data_set->tickets == NULL) {
1260         data_set->tickets = pcmk__strkey_table(free, destroy_ticket);
1261     }
1262 
1263     for (state = pcmk__xe_first_child(status); state != NULL;
1264          state = pcmk__xe_next(state)) {
1265 
1266         if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_TICKETS, pcmk__str_none)) {
1267             unpack_tickets_state((xmlNode *) state, data_set);
1268 
1269         } else if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
1270             unpack_node_state(state, data_set);
1271         }
1272     }
1273 
1274     while (unpack_node_history(status, FALSE, data_set) == EAGAIN) {
1275         crm_trace("Another pass through node resource histories is needed");
1276     }
1277 
1278     // Now catch any nodes we didn't see
1279     unpack_node_history(status,
1280                         pcmk_is_set(data_set->flags, pe_flag_stonith_enabled),
1281                         data_set);
1282 
1283     /* Now that we know where resources are, we can schedule stops of containers
1284      * with failed bundle connections
1285      */
1286     if (data_set->stop_needed != NULL) {
1287         for (GList *item = data_set->stop_needed; item; item = item->next) {
1288             pe_resource_t *container = item->data;
1289             pe_node_t *node = pe__current_node(container);
1290 
1291             if (node) {
1292                 stop_action(container, node, FALSE);
1293             }
1294         }
1295         g_list_free(data_set->stop_needed);
1296         data_set->stop_needed = NULL;
1297     }
1298 
1299     /* Now that we know status of all Pacemaker Remote connections and nodes,
1300      * we can stop connections for node shutdowns, and check the online status
1301      * of remote/guest nodes that didn't have any node history to unpack.
1302      */
1303     for (GList *gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) {
1304         pe_node_t *this_node = gIter->data;
1305 
1306         if (!pe__is_guest_or_remote_node(this_node)) {
1307             continue;
1308         }
1309         if (this_node->details->shutdown
1310             && (this_node->details->remote_rsc != NULL)) {
1311             pe__set_next_role(this_node->details->remote_rsc, RSC_ROLE_STOPPED,
1312                               "remote shutdown");
1313         }
1314         if (!this_node->details->unpacked) {
1315             determine_remote_online_status(data_set, this_node);
1316         }
1317     }
1318 
1319     return TRUE;
1320 }
1321 
1322 static gboolean
1323 determine_online_status_no_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1324                                    pe_node_t * this_node)
1325 {
1326     gboolean online = FALSE;
1327     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1328     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1329     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1330     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1331 
1332     if (!crm_is_true(in_cluster)) {
1333         crm_trace("Node is down: in_cluster=%s",
1334                   pcmk__s(in_cluster, "<null>"));
1335 
1336     } else if (pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei)) {
1337         if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1338             online = TRUE;
1339         } else {
1340             crm_debug("Node is not ready to run resources: %s", join);
1341         }
1342 
1343     } else if (this_node->details->expected_up == FALSE) {
1344         crm_trace("Controller is down: "
1345                   "in_cluster=%s is_peer=%s join=%s expected=%s",
1346                   pcmk__s(in_cluster, "<null>"), pcmk__s(is_peer, "<null>"),
1347                   pcmk__s(join, "<null>"), pcmk__s(exp_state, "<null>"));
1348 
1349     } else {
1350         /* mark it unclean */
1351         pe_fence_node(data_set, this_node, "peer is unexpectedly down", FALSE);
1352         crm_info("in_cluster=%s is_peer=%s join=%s expected=%s",
1353                  pcmk__s(in_cluster, "<null>"), pcmk__s(is_peer, "<null>"),
1354                  pcmk__s(join, "<null>"), pcmk__s(exp_state, "<null>"));
1355     }
1356     return online;
1357 }
1358 
1359 static gboolean
1360 determine_online_status_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1361                                 pe_node_t * this_node)
1362 {
1363     gboolean online = FALSE;
1364     gboolean do_terminate = FALSE;
1365     bool crmd_online = FALSE;
1366     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1367     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1368     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1369     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1370     const char *terminate = pe_node_attribute_raw(this_node, "terminate");
1371 
1372 /*
1373   - XML_NODE_IN_CLUSTER    ::= true|false
1374   - XML_NODE_IS_PEER       ::= online|offline
1375   - XML_NODE_JOIN_STATE    ::= member|down|pending|banned
1376   - XML_NODE_EXPECTED      ::= member|down
1377 */
1378 
1379     if (crm_is_true(terminate)) {
1380         do_terminate = TRUE;
1381 
1382     } else if (terminate != NULL && strlen(terminate) > 0) {
1383         /* could be a time() value */
1384         char t = terminate[0];
1385 
1386         if (t != '0' && isdigit(t)) {
1387             do_terminate = TRUE;
1388         }
1389     }
1390 
1391     crm_trace("%s: in_cluster=%s is_peer=%s join=%s expected=%s term=%d",
1392               pe__node_name(this_node), pcmk__s(in_cluster, "<null>"),
1393               pcmk__s(is_peer, "<null>"), pcmk__s(join, "<null>"),
1394               pcmk__s(exp_state, "<null>"), do_terminate);
1395 
1396     online = crm_is_true(in_cluster);
1397     crmd_online = pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei);
1398     if (exp_state == NULL) {
1399         exp_state = CRMD_JOINSTATE_DOWN;
1400     }
1401 
1402     if (this_node->details->shutdown) {
1403         crm_debug("%s is shutting down", pe__node_name(this_node));
1404 
1405         /* Slightly different criteria since we can't shut down a dead peer */
1406         online = crmd_online;
1407 
1408     } else if (in_cluster == NULL) {
1409         pe_fence_node(data_set, this_node, "peer has not been seen by the cluster", FALSE);
1410 
1411     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_NACK, pcmk__str_casei)) {
1412         pe_fence_node(data_set, this_node,
1413                       "peer failed Pacemaker membership criteria", FALSE);
1414 
1415     } else if (do_terminate == FALSE && pcmk__str_eq(exp_state, CRMD_JOINSTATE_DOWN, pcmk__str_casei)) {
1416 
1417         if (crm_is_true(in_cluster) || crmd_online) {
1418             crm_info("- %s is not ready to run resources",
1419                      pe__node_name(this_node));
1420             this_node->details->standby = TRUE;
1421             this_node->details->pending = TRUE;
1422 
1423         } else {
1424             crm_trace("%s is down or still coming up",
1425                       pe__node_name(this_node));
1426         }
1427 
1428     } else if (do_terminate && pcmk__str_eq(join, CRMD_JOINSTATE_DOWN, pcmk__str_casei)
1429                && crm_is_true(in_cluster) == FALSE && !crmd_online) {
1430         crm_info("%s was just shot", pe__node_name(this_node));
1431         online = FALSE;
1432 
1433     } else if (crm_is_true(in_cluster) == FALSE) {
1434         // Consider `priority-fencing-delay` for lost nodes
1435         pe_fence_node(data_set, this_node, "peer is no longer part of the cluster", TRUE);
1436 
1437     } else if (!crmd_online) {
1438         pe_fence_node(data_set, this_node, "peer process is no longer available", FALSE);
1439 
1440         /* Everything is running at this point, now check join state */
1441     } else if (do_terminate) {
1442         pe_fence_node(data_set, this_node, "termination was requested", FALSE);
1443 
1444     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1445         crm_info("%s is active", pe__node_name(this_node));
1446 
1447     } else if (pcmk__strcase_any_of(join, CRMD_JOINSTATE_PENDING, CRMD_JOINSTATE_DOWN, NULL)) {
1448         crm_info("%s is not ready to run resources", pe__node_name(this_node));
1449         this_node->details->standby = TRUE;
1450         this_node->details->pending = TRUE;
1451 
1452     } else {
1453         pe_fence_node(data_set, this_node, "peer was in an unknown state", FALSE);
1454         crm_warn("%s: in-cluster=%s is-peer=%s join=%s expected=%s term=%d shutdown=%d",
1455                  pe__node_name(this_node), pcmk__s(in_cluster, "<null>"),
1456                  pcmk__s(is_peer, "<null>"), pcmk__s(join, "<null>"),
1457                  pcmk__s(exp_state, "<null>"), do_terminate,
1458                  this_node->details->shutdown);
1459     }
1460 
1461     return online;
1462 }
1463 
1464 static void
1465 determine_remote_online_status(pe_working_set_t * data_set, pe_node_t * this_node)
     /* [previous][next][first][last][top][bottom][index][help] */
1466 {
1467     pe_resource_t *rsc = this_node->details->remote_rsc;
1468     pe_resource_t *container = NULL;
1469     pe_node_t *host = NULL;
1470 
1471     /* If there is a node state entry for a (former) Pacemaker Remote node
1472      * but no resource creating that node, the node's connection resource will
1473      * be NULL. Consider it an offline remote node in that case.
1474      */
1475     if (rsc == NULL) {
1476         this_node->details->online = FALSE;
1477         goto remote_online_done;
1478     }
1479 
1480     container = rsc->container;
1481 
1482     if (container && pcmk__list_of_1(rsc->running_on)) {
1483         host = rsc->running_on->data;
1484     }
1485 
1486     /* If the resource is currently started, mark it online. */
1487     if (rsc->role == RSC_ROLE_STARTED) {
1488         crm_trace("%s node %s presumed ONLINE because connection resource is started",
1489                   (container? "Guest" : "Remote"), this_node->details->id);
1490         this_node->details->online = TRUE;
1491     }
1492 
1493     /* consider this node shutting down if transitioning start->stop */
1494     if (rsc->role == RSC_ROLE_STARTED && rsc->next_role == RSC_ROLE_STOPPED) {
1495         crm_trace("%s node %s shutting down because connection resource is stopping",
1496                   (container? "Guest" : "Remote"), this_node->details->id);
1497         this_node->details->shutdown = TRUE;
1498     }
1499 
1500     /* Now check all the failure conditions. */
1501     if(container && pcmk_is_set(container->flags, pe_rsc_failed)) {
1502         crm_trace("Guest node %s UNCLEAN because guest resource failed",
1503                   this_node->details->id);
1504         this_node->details->online = FALSE;
1505         this_node->details->remote_requires_reset = TRUE;
1506 
1507     } else if (pcmk_is_set(rsc->flags, pe_rsc_failed)) {
1508         crm_trace("%s node %s OFFLINE because connection resource failed",
1509                   (container? "Guest" : "Remote"), this_node->details->id);
1510         this_node->details->online = FALSE;
1511 
1512     } else if (rsc->role == RSC_ROLE_STOPPED
1513         || (container && container->role == RSC_ROLE_STOPPED)) {
1514 
1515         crm_trace("%s node %s OFFLINE because its resource is stopped",
1516                   (container? "Guest" : "Remote"), this_node->details->id);
1517         this_node->details->online = FALSE;
1518         this_node->details->remote_requires_reset = FALSE;
1519 
1520     } else if (host && (host->details->online == FALSE)
1521                && host->details->unclean) {
1522         crm_trace("Guest node %s UNCLEAN because host is unclean",
1523                   this_node->details->id);
1524         this_node->details->online = FALSE;
1525         this_node->details->remote_requires_reset = TRUE;
1526     }
1527 
1528 remote_online_done:
1529     crm_trace("Remote node %s online=%s",
1530         this_node->details->id, this_node->details->online ? "TRUE" : "FALSE");
1531 }
1532 
1533 static void
1534 determine_online_status(xmlNode * node_state, pe_node_t * this_node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1535 {
1536     gboolean online = FALSE;
1537     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1538 
1539     CRM_CHECK(this_node != NULL, return);
1540 
1541     this_node->details->shutdown = FALSE;
1542     this_node->details->expected_up = FALSE;
1543 
1544     if (pe__shutdown_requested(this_node)) {
1545         this_node->details->shutdown = TRUE;
1546 
1547     } else if (pcmk__str_eq(exp_state, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1548         this_node->details->expected_up = TRUE;
1549     }
1550 
1551     if (this_node->details->type == node_ping) {
1552         this_node->details->unclean = FALSE;
1553         online = FALSE;         /* As far as resource management is concerned,
1554                                  * the node is safely offline.
1555                                  * Anyone caught abusing this logic will be shot
1556                                  */
1557 
1558     } else if (!pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1559         online = determine_online_status_no_fencing(data_set, node_state, this_node);
1560 
1561     } else {
1562         online = determine_online_status_fencing(data_set, node_state, this_node);
1563     }
1564 
1565     if (online) {
1566         this_node->details->online = TRUE;
1567 
1568     } else {
1569         /* remove node from contention */
1570         this_node->fixed = TRUE;
1571         this_node->weight = -INFINITY;
1572     }
1573 
1574     if (online && this_node->details->shutdown) {
1575         /* don't run resources here */
1576         this_node->fixed = TRUE;
1577         this_node->weight = -INFINITY;
1578     }
1579 
1580     if (this_node->details->type == node_ping) {
1581         crm_info("%s is not a Pacemaker node", pe__node_name(this_node));
1582 
1583     } else if (this_node->details->unclean) {
1584         pe_proc_warn("%s is unclean", pe__node_name(this_node));
1585 
1586     } else if (this_node->details->online) {
1587         crm_info("%s is %s", pe__node_name(this_node),
1588                  this_node->details->shutdown ? "shutting down" :
1589                  this_node->details->pending ? "pending" :
1590                  this_node->details->standby ? "standby" :
1591                  this_node->details->maintenance ? "maintenance" : "online");
1592 
1593     } else {
1594         crm_trace("%s is offline", pe__node_name(this_node));
1595     }
1596 }
1597 
1598 /*!
1599  * \internal
1600  * \brief Find the end of a resource's name, excluding any clone suffix
1601  *
1602  * \param[in] id  Resource ID to check
1603  *
1604  * \return Pointer to last character of resource's base name
1605  */
1606 const char *
1607 pe_base_name_end(const char *id)
     /* [previous][next][first][last][top][bottom][index][help] */
1608 {
1609     if (!pcmk__str_empty(id)) {
1610         const char *end = id + strlen(id) - 1;
1611 
1612         for (const char *s = end; s > id; --s) {
1613             switch (*s) {
1614                 case '0':
1615                 case '1':
1616                 case '2':
1617                 case '3':
1618                 case '4':
1619                 case '5':
1620                 case '6':
1621                 case '7':
1622                 case '8':
1623                 case '9':
1624                     break;
1625                 case ':':
1626                     return (s == end)? s : (s - 1);
1627                 default:
1628                     return end;
1629             }
1630         }
1631         return end;
1632     }
1633     return NULL;
1634 }
1635 
1636 /*!
1637  * \internal
1638  * \brief Get a resource name excluding any clone suffix
1639  *
1640  * \param[in] last_rsc_id  Resource ID to check
1641  *
1642  * \return Pointer to newly allocated string with resource's base name
1643  * \note It is the caller's responsibility to free() the result.
1644  *       This asserts on error, so callers can assume result is not NULL.
1645  */
1646 char *
1647 clone_strip(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1648 {
1649     const char *end = pe_base_name_end(last_rsc_id);
1650     char *basename = NULL;
1651 
1652     CRM_ASSERT(end);
1653     basename = strndup(last_rsc_id, end - last_rsc_id + 1);
1654     CRM_ASSERT(basename);
1655     return basename;
1656 }
1657 
1658 /*!
1659  * \internal
1660  * \brief Get the name of the first instance of a cloned resource
1661  *
1662  * \param[in] last_rsc_id  Resource ID to check
1663  *
1664  * \return Pointer to newly allocated string with resource's base name plus :0
1665  * \note It is the caller's responsibility to free() the result.
1666  *       This asserts on error, so callers can assume result is not NULL.
1667  */
1668 char *
1669 clone_zero(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1670 {
1671     const char *end = pe_base_name_end(last_rsc_id);
1672     size_t base_name_len = end - last_rsc_id + 1;
1673     char *zero = NULL;
1674 
1675     CRM_ASSERT(end);
1676     zero = calloc(base_name_len + 3, sizeof(char));
1677     CRM_ASSERT(zero);
1678     memcpy(zero, last_rsc_id, base_name_len);
1679     zero[base_name_len] = ':';
1680     zero[base_name_len + 1] = '0';
1681     return zero;
1682 }
1683 
1684 static pe_resource_t *
1685 create_fake_resource(const char *rsc_id, xmlNode * rsc_entry, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1686 {
1687     pe_resource_t *rsc = NULL;
1688     xmlNode *xml_rsc = create_xml_node(NULL, XML_CIB_TAG_RESOURCE);
1689 
1690     copy_in_properties(xml_rsc, rsc_entry);
1691     crm_xml_add(xml_rsc, XML_ATTR_ID, rsc_id);
1692     crm_log_xml_debug(xml_rsc, "Orphan resource");
1693 
1694     if (pe__unpack_resource(xml_rsc, &rsc, NULL, data_set) != pcmk_rc_ok) {
1695         return NULL;
1696     }
1697 
1698     if (xml_contains_remote_node(xml_rsc)) {
1699         pe_node_t *node;
1700 
1701         crm_debug("Detected orphaned remote node %s", rsc_id);
1702         node = pe_find_node(data_set->nodes, rsc_id);
1703         if (node == NULL) {
1704                 node = pe_create_node(rsc_id, rsc_id, "remote", NULL, data_set);
1705         }
1706         link_rsc2remotenode(data_set, rsc);
1707 
1708         if (node) {
1709             crm_trace("Setting node %s as shutting down due to orphaned connection resource", rsc_id);
1710             node->details->shutdown = TRUE;
1711         }
1712     }
1713 
1714     if (crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER)) {
1715         /* This orphaned rsc needs to be mapped to a container. */
1716         crm_trace("Detected orphaned container filler %s", rsc_id);
1717         pe__set_resource_flags(rsc, pe_rsc_orphan_container_filler);
1718     }
1719     pe__set_resource_flags(rsc, pe_rsc_orphan);
1720     data_set->resources = g_list_append(data_set->resources, rsc);
1721     return rsc;
1722 }
1723 
1724 /*!
1725  * \internal
1726  * \brief Create orphan instance for anonymous clone resource history
1727  */
1728 static pe_resource_t *
1729 create_anonymous_orphan(pe_resource_t *parent, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1730                         pe_node_t *node, pe_working_set_t *data_set)
1731 {
1732     pe_resource_t *top = pe__create_clone_child(parent, data_set);
1733 
1734     // find_rsc() because we might be a cloned group
1735     pe_resource_t *orphan = top->fns->find_rsc(top, rsc_id, NULL, pe_find_clone);
1736 
1737     pe_rsc_debug(parent, "Created orphan %s for %s: %s on %s",
1738                  top->id, parent->id, rsc_id, pe__node_name(node));
1739     return orphan;
1740 }
1741 
1742 /*!
1743  * \internal
1744  * \brief Check a node for an instance of an anonymous clone
1745  *
1746  * Return a child instance of the specified anonymous clone, in order of
1747  * preference: (1) the instance running on the specified node, if any;
1748  * (2) an inactive instance (i.e. within the total of clone-max instances);
1749  * (3) a newly created orphan (i.e. clone-max instances are already active).
1750  *
1751  * \param[in] data_set  Cluster information
1752  * \param[in] node      Node on which to check for instance
1753  * \param[in] parent    Clone to check
1754  * \param[in] rsc_id    Name of cloned resource in history (without instance)
1755  */
1756 static pe_resource_t *
1757 find_anonymous_clone(pe_working_set_t * data_set, pe_node_t * node, pe_resource_t * parent,
     /* [previous][next][first][last][top][bottom][index][help] */
1758                      const char *rsc_id)
1759 {
1760     GList *rIter = NULL;
1761     pe_resource_t *rsc = NULL;
1762     pe_resource_t *inactive_instance = NULL;
1763     gboolean skip_inactive = FALSE;
1764 
1765     CRM_ASSERT(parent != NULL);
1766     CRM_ASSERT(pe_rsc_is_clone(parent));
1767     CRM_ASSERT(!pcmk_is_set(parent->flags, pe_rsc_unique));
1768 
1769     // Check for active (or partially active, for cloned groups) instance
1770     pe_rsc_trace(parent, "Looking for %s on %s in %s",
1771                  rsc_id, pe__node_name(node), parent->id);
1772     for (rIter = parent->children; rsc == NULL && rIter; rIter = rIter->next) {
1773         GList *locations = NULL;
1774         pe_resource_t *child = rIter->data;
1775 
1776         /* Check whether this instance is already known to be active or pending
1777          * anywhere, at this stage of unpacking. Because this function is called
1778          * for a resource before the resource's individual operation history
1779          * entries are unpacked, locations will generally not contain the
1780          * desired node.
1781          *
1782          * However, there are three exceptions:
1783          * (1) when child is a cloned group and we have already unpacked the
1784          *     history of another member of the group on the same node;
1785          * (2) when we've already unpacked the history of another numbered
1786          *     instance on the same node (which can happen if globally-unique
1787          *     was flipped from true to false); and
1788          * (3) when we re-run calculations on the same data set as part of a
1789          *     simulation.
1790          */
1791         child->fns->location(child, &locations, 2);
1792         if (locations) {
1793             /* We should never associate the same numbered anonymous clone
1794              * instance with multiple nodes, and clone instances can't migrate,
1795              * so there must be only one location, regardless of history.
1796              */
1797             CRM_LOG_ASSERT(locations->next == NULL);
1798 
1799             if (((pe_node_t *)locations->data)->details == node->details) {
1800                 /* This child instance is active on the requested node, so check
1801                  * for a corresponding configured resource. We use find_rsc()
1802                  * instead of child because child may be a cloned group, and we
1803                  * need the particular member corresponding to rsc_id.
1804                  *
1805                  * If the history entry is orphaned, rsc will be NULL.
1806                  */
1807                 rsc = parent->fns->find_rsc(child, rsc_id, NULL, pe_find_clone);
1808                 if (rsc) {
1809                     /* If there are multiple instance history entries for an
1810                      * anonymous clone in a single node's history (which can
1811                      * happen if globally-unique is switched from true to
1812                      * false), we want to consider the instances beyond the
1813                      * first as orphans, even if there are inactive instance
1814                      * numbers available.
1815                      */
1816                     if (rsc->running_on) {
1817                         crm_notice("Active (now-)anonymous clone %s has "
1818                                    "multiple (orphan) instance histories on %s",
1819                                    parent->id, pe__node_name(node));
1820                         skip_inactive = TRUE;
1821                         rsc = NULL;
1822                     } else {
1823                         pe_rsc_trace(parent, "Resource %s, active", rsc->id);
1824                     }
1825                 }
1826             }
1827             g_list_free(locations);
1828 
1829         } else {
1830             pe_rsc_trace(parent, "Resource %s, skip inactive", child->id);
1831             if (!skip_inactive && !inactive_instance
1832                 && !pcmk_is_set(child->flags, pe_rsc_block)) {
1833                 // Remember one inactive instance in case we don't find active
1834                 inactive_instance = parent->fns->find_rsc(child, rsc_id, NULL,
1835                                                           pe_find_clone);
1836 
1837                 /* ... but don't use it if it was already associated with a
1838                  * pending action on another node
1839                  */
1840                 if (inactive_instance && inactive_instance->pending_node
1841                     && (inactive_instance->pending_node->details != node->details)) {
1842                     inactive_instance = NULL;
1843                 }
1844             }
1845         }
1846     }
1847 
1848     if ((rsc == NULL) && !skip_inactive && (inactive_instance != NULL)) {
1849         pe_rsc_trace(parent, "Resource %s, empty slot", inactive_instance->id);
1850         rsc = inactive_instance;
1851     }
1852 
1853     /* If the resource has "requires" set to "quorum" or "nothing", and we don't
1854      * have a clone instance for every node, we don't want to consume a valid
1855      * instance number for unclean nodes. Such instances may appear to be active
1856      * according to the history, but should be considered inactive, so we can
1857      * start an instance elsewhere. Treat such instances as orphans.
1858      *
1859      * An exception is instances running on guest nodes -- since guest node
1860      * "fencing" is actually just a resource stop, requires shouldn't apply.
1861      *
1862      * @TODO Ideally, we'd use an inactive instance number if it is not needed
1863      * for any clean instances. However, we don't know that at this point.
1864      */
1865     if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_needs_fencing)
1866         && (!node->details->online || node->details->unclean)
1867         && !pe__is_guest_node(node)
1868         && !pe__is_universal_clone(parent, data_set)) {
1869 
1870         rsc = NULL;
1871     }
1872 
1873     if (rsc == NULL) {
1874         rsc = create_anonymous_orphan(parent, rsc_id, node, data_set);
1875         pe_rsc_trace(parent, "Resource %s, orphan", rsc->id);
1876     }
1877     return rsc;
1878 }
1879 
1880 static pe_resource_t *
1881 unpack_find_resource(pe_working_set_t * data_set, pe_node_t * node, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1882                      xmlNode * rsc_entry)
1883 {
1884     pe_resource_t *rsc = NULL;
1885     pe_resource_t *parent = NULL;
1886 
1887     crm_trace("looking for %s", rsc_id);
1888     rsc = pe_find_resource(data_set->resources, rsc_id);
1889 
1890     if (rsc == NULL) {
1891         /* If we didn't find the resource by its name in the operation history,
1892          * check it again as a clone instance. Even when clone-max=0, we create
1893          * a single :0 orphan to match against here.
1894          */
1895         char *clone0_id = clone_zero(rsc_id);
1896         pe_resource_t *clone0 = pe_find_resource(data_set->resources, clone0_id);
1897 
1898         if (clone0 && !pcmk_is_set(clone0->flags, pe_rsc_unique)) {
1899             rsc = clone0;
1900             parent = uber_parent(clone0);
1901             crm_trace("%s found as %s (%s)", rsc_id, clone0_id, parent->id);
1902         } else {
1903             crm_trace("%s is not known as %s either (orphan)",
1904                       rsc_id, clone0_id);
1905         }
1906         free(clone0_id);
1907 
1908     } else if (rsc->variant > pe_native) {
1909         crm_trace("Resource history for %s is orphaned because it is no longer primitive",
1910                   rsc_id);
1911         return NULL;
1912 
1913     } else {
1914         parent = uber_parent(rsc);
1915     }
1916 
1917     if (pe_rsc_is_anon_clone(parent)) {
1918 
1919         if (pe_rsc_is_bundled(parent)) {
1920             rsc = pe__find_bundle_replica(parent->parent, node);
1921         } else {
1922             char *base = clone_strip(rsc_id);
1923 
1924             rsc = find_anonymous_clone(data_set, node, parent, base);
1925             free(base);
1926             CRM_ASSERT(rsc != NULL);
1927         }
1928     }
1929 
1930     if (rsc && !pcmk__str_eq(rsc_id, rsc->id, pcmk__str_casei)
1931         && !pcmk__str_eq(rsc_id, rsc->clone_name, pcmk__str_casei)) {
1932 
1933         pcmk__str_update(&rsc->clone_name, rsc_id);
1934         pe_rsc_debug(rsc, "Internally renamed %s on %s to %s%s",
1935                      rsc_id, pe__node_name(node), rsc->id,
1936                      (pcmk_is_set(rsc->flags, pe_rsc_orphan)? " (ORPHAN)" : ""));
1937     }
1938     return rsc;
1939 }
1940 
1941 static pe_resource_t *
1942 process_orphan_resource(xmlNode * rsc_entry, pe_node_t * node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1943 {
1944     pe_resource_t *rsc = NULL;
1945     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
1946 
1947     crm_debug("Detected orphan resource %s on %s", rsc_id, pe__node_name(node));
1948     rsc = create_fake_resource(rsc_id, rsc_entry, data_set);
1949     if (rsc == NULL) {
1950         return NULL;
1951     }
1952 
1953     if (!pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)) {
1954         pe__clear_resource_flags(rsc, pe_rsc_managed);
1955 
1956     } else {
1957         CRM_CHECK(rsc != NULL, return NULL);
1958         pe_rsc_trace(rsc, "Added orphan %s", rsc->id);
1959         resource_location(rsc, NULL, -INFINITY, "__orphan_do_not_run__", data_set);
1960     }
1961     return rsc;
1962 }
1963 
1964 static void
1965 process_rsc_state(pe_resource_t * rsc, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
1966                   enum action_fail_response on_fail,
1967                   xmlNode * migrate_op, pe_working_set_t * data_set)
1968 {
1969     pe_node_t *tmpnode = NULL;
1970     char *reason = NULL;
1971     enum action_fail_response save_on_fail = action_fail_ignore;
1972 
1973     CRM_ASSERT(rsc);
1974     pe_rsc_trace(rsc, "Resource %s is %s on %s: on_fail=%s",
1975                  rsc->id, role2text(rsc->role), pe__node_name(node),
1976                  fail2text(on_fail));
1977 
1978     /* process current state */
1979     if (rsc->role != RSC_ROLE_UNKNOWN) {
1980         pe_resource_t *iter = rsc;
1981 
1982         while (iter) {
1983             if (g_hash_table_lookup(iter->known_on, node->details->id) == NULL) {
1984                 pe_node_t *n = pe__copy_node(node);
1985 
1986                 pe_rsc_trace(rsc, "%s%s%s known on %s",
1987                              rsc->id,
1988                              ((rsc->clone_name == NULL)? "" : " also known as "),
1989                              ((rsc->clone_name == NULL)? "" : rsc->clone_name),
1990                              pe__node_name(n));
1991                 g_hash_table_insert(iter->known_on, (gpointer) n->details->id, n);
1992             }
1993             if (pcmk_is_set(iter->flags, pe_rsc_unique)) {
1994                 break;
1995             }
1996             iter = iter->parent;
1997         }
1998     }
1999 
2000     /* If a managed resource is believed to be running, but node is down ... */
2001     if (rsc->role > RSC_ROLE_STOPPED
2002         && node->details->online == FALSE
2003         && node->details->maintenance == FALSE
2004         && pcmk_is_set(rsc->flags, pe_rsc_managed)) {
2005 
2006         gboolean should_fence = FALSE;
2007 
2008         /* If this is a guest node, fence it (regardless of whether fencing is
2009          * enabled, because guest node fencing is done by recovery of the
2010          * container resource rather than by the fencer). Mark the resource
2011          * we're processing as failed. When the guest comes back up, its
2012          * operation history in the CIB will be cleared, freeing the affected
2013          * resource to run again once we are sure we know its state.
2014          */
2015         if (pe__is_guest_node(node)) {
2016             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2017             should_fence = TRUE;
2018 
2019         } else if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
2020             if (pe__is_remote_node(node) && node->details->remote_rsc
2021                 && !pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_failed)) {
2022 
2023                 /* Setting unseen means that fencing of the remote node will
2024                  * occur only if the connection resource is not going to start
2025                  * somewhere. This allows connection resources on a failed
2026                  * cluster node to move to another node without requiring the
2027                  * remote nodes to be fenced as well.
2028                  */
2029                 node->details->unseen = TRUE;
2030                 reason = crm_strdup_printf("%s is active there (fencing will be"
2031                                            " revoked if remote connection can "
2032                                            "be re-established elsewhere)",
2033                                            rsc->id);
2034             }
2035             should_fence = TRUE;
2036         }
2037 
2038         if (should_fence) {
2039             if (reason == NULL) {
2040                reason = crm_strdup_printf("%s is thought to be active there", rsc->id);
2041             }
2042             pe_fence_node(data_set, node, reason, FALSE);
2043         }
2044         free(reason);
2045     }
2046 
2047     /* In order to calculate priority_fencing_delay correctly, save the failure information and pass it to native_add_running(). */
2048     save_on_fail = on_fail;
2049 
2050     if (node->details->unclean) {
2051         /* No extra processing needed
2052          * Also allows resources to be started again after a node is shot
2053          */
2054         on_fail = action_fail_ignore;
2055     }
2056 
2057     switch (on_fail) {
2058         case action_fail_ignore:
2059             /* nothing to do */
2060             break;
2061 
2062         case action_fail_demote:
2063             pe__set_resource_flags(rsc, pe_rsc_failed);
2064             demote_action(rsc, node, FALSE);
2065             break;
2066 
2067         case action_fail_fence:
2068             /* treat it as if it is still running
2069              * but also mark the node as unclean
2070              */
2071             reason = crm_strdup_printf("%s failed there", rsc->id);
2072             pe_fence_node(data_set, node, reason, FALSE);
2073             free(reason);
2074             break;
2075 
2076         case action_fail_standby:
2077             node->details->standby = TRUE;
2078             node->details->standby_onfail = TRUE;
2079             break;
2080 
2081         case action_fail_block:
2082             /* is_managed == FALSE will prevent any
2083              * actions being sent for the resource
2084              */
2085             pe__clear_resource_flags(rsc, pe_rsc_managed);
2086             pe__set_resource_flags(rsc, pe_rsc_block);
2087             break;
2088 
2089         case action_fail_migrate:
2090             /* make sure it comes up somewhere else
2091              * or not at all
2092              */
2093             resource_location(rsc, node, -INFINITY, "__action_migration_auto__", data_set);
2094             break;
2095 
2096         case action_fail_stop:
2097             pe__set_next_role(rsc, RSC_ROLE_STOPPED, "on-fail=stop");
2098             break;
2099 
2100         case action_fail_recover:
2101             if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2102                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2103                 stop_action(rsc, node, FALSE);
2104             }
2105             break;
2106 
2107         case action_fail_restart_container:
2108             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2109             if (rsc->container && pe_rsc_is_bundled(rsc)) {
2110                 /* A bundle's remote connection can run on a different node than
2111                  * the bundle's container. We don't necessarily know where the
2112                  * container is running yet, so remember it and add a stop
2113                  * action for it later.
2114                  */
2115                 data_set->stop_needed = g_list_prepend(data_set->stop_needed,
2116                                                        rsc->container);
2117             } else if (rsc->container) {
2118                 stop_action(rsc->container, node, FALSE);
2119             } else if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2120                 stop_action(rsc, node, FALSE);
2121             }
2122             break;
2123 
2124         case action_fail_reset_remote:
2125             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2126             if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
2127                 tmpnode = NULL;
2128                 if (rsc->is_remote_node) {
2129                     tmpnode = pe_find_node(data_set->nodes, rsc->id);
2130                 }
2131                 if (tmpnode &&
2132                     pe__is_remote_node(tmpnode) &&
2133                     tmpnode->details->remote_was_fenced == 0) {
2134 
2135                     /* The remote connection resource failed in a way that
2136                      * should result in fencing the remote node.
2137                      */
2138                     pe_fence_node(data_set, tmpnode,
2139                                   "remote connection is unrecoverable", FALSE);
2140                 }
2141             }
2142 
2143             /* require the stop action regardless if fencing is occurring or not. */
2144             if (rsc->role > RSC_ROLE_STOPPED) {
2145                 stop_action(rsc, node, FALSE);
2146             }
2147 
2148             /* if reconnect delay is in use, prevent the connection from exiting the
2149              * "STOPPED" role until the failure is cleared by the delay timeout. */
2150             if (rsc->remote_reconnect_ms) {
2151                 pe__set_next_role(rsc, RSC_ROLE_STOPPED, "remote reset");
2152             }
2153             break;
2154     }
2155 
2156     /* ensure a remote-node connection failure forces an unclean remote-node
2157      * to be fenced. By setting unseen = FALSE, the remote-node failure will
2158      * result in a fencing operation regardless if we're going to attempt to 
2159      * reconnect to the remote-node in this transition or not. */
2160     if (pcmk_is_set(rsc->flags, pe_rsc_failed) && rsc->is_remote_node) {
2161         tmpnode = pe_find_node(data_set->nodes, rsc->id);
2162         if (tmpnode && tmpnode->details->unclean) {
2163             tmpnode->details->unseen = FALSE;
2164         }
2165     }
2166 
2167     if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2168         if (pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
2169             if (pcmk_is_set(rsc->flags, pe_rsc_managed)) {
2170                 pcmk__config_warn("Detected active orphan %s running on %s",
2171                                   rsc->id, pe__node_name(node));
2172             } else {
2173                 pcmk__config_warn("Resource '%s' must be stopped manually on "
2174                                   "%s because cluster is configured not to "
2175                                   "stop active orphans",
2176                                   rsc->id, pe__node_name(node));
2177             }
2178         }
2179 
2180         native_add_running(rsc, node, data_set, (save_on_fail != action_fail_ignore));
2181         switch (on_fail) {
2182             case action_fail_ignore:
2183                 break;
2184             case action_fail_demote:
2185             case action_fail_block:
2186                 pe__set_resource_flags(rsc, pe_rsc_failed);
2187                 break;
2188             default:
2189                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2190                 break;
2191         }
2192 
2193     } else if (rsc->clone_name && strchr(rsc->clone_name, ':') != NULL) {
2194         /* Only do this for older status sections that included instance numbers
2195          * Otherwise stopped instances will appear as orphans
2196          */
2197         pe_rsc_trace(rsc, "Resetting clone_name %s for %s (stopped)", rsc->clone_name, rsc->id);
2198         free(rsc->clone_name);
2199         rsc->clone_name = NULL;
2200 
2201     } else {
2202         GList *possible_matches = pe__resource_actions(rsc, node, RSC_STOP,
2203                                                        FALSE);
2204         GList *gIter = possible_matches;
2205 
2206         for (; gIter != NULL; gIter = gIter->next) {
2207             pe_action_t *stop = (pe_action_t *) gIter->data;
2208 
2209             pe__set_action_flags(stop, pe_action_optional);
2210         }
2211 
2212         g_list_free(possible_matches);
2213     }
2214 
2215     /* A successful stop after migrate_to on the migration source doesn't make
2216      * the partially migrated resource stopped on the migration target.
2217      */
2218     if (rsc->role == RSC_ROLE_STOPPED
2219         && rsc->partial_migration_source
2220         && rsc->partial_migration_source->details == node->details
2221         && rsc->partial_migration_target
2222         && rsc->running_on) {
2223 
2224         rsc->role = RSC_ROLE_STARTED;
2225     }
2226 }
2227 
2228 /* create active recurring operations as optional */
2229 static void
2230 process_recurring(pe_node_t * node, pe_resource_t * rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
2231                   int start_index, int stop_index,
2232                   GList *sorted_op_list, pe_working_set_t * data_set)
2233 {
2234     int counter = -1;
2235     const char *task = NULL;
2236     const char *status = NULL;
2237     GList *gIter = sorted_op_list;
2238 
2239     CRM_ASSERT(rsc);
2240     pe_rsc_trace(rsc, "%s: Start index %d, stop index = %d", rsc->id, start_index, stop_index);
2241 
2242     for (; gIter != NULL; gIter = gIter->next) {
2243         xmlNode *rsc_op = (xmlNode *) gIter->data;
2244 
2245         guint interval_ms = 0;
2246         char *key = NULL;
2247         const char *id = ID(rsc_op);
2248 
2249         counter++;
2250 
2251         if (node->details->online == FALSE) {
2252             pe_rsc_trace(rsc, "Skipping %s on %s: node is offline",
2253                          rsc->id, pe__node_name(node));
2254             break;
2255 
2256             /* Need to check if there's a monitor for role="Stopped" */
2257         } else if (start_index < stop_index && counter <= stop_index) {
2258             pe_rsc_trace(rsc, "Skipping %s on %s: resource is not active",
2259                          id, pe__node_name(node));
2260             continue;
2261 
2262         } else if (counter < start_index) {
2263             pe_rsc_trace(rsc, "Skipping %s on %s: old %d",
2264                          id, pe__node_name(node), counter);
2265             continue;
2266         }
2267 
2268         crm_element_value_ms(rsc_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
2269         if (interval_ms == 0) {
2270             pe_rsc_trace(rsc, "Skipping %s on %s: non-recurring",
2271                          id, pe__node_name(node));
2272             continue;
2273         }
2274 
2275         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2276         if (pcmk__str_eq(status, "-1", pcmk__str_casei)) {
2277             pe_rsc_trace(rsc, "Skipping %s on %s: status",
2278                          id, pe__node_name(node));
2279             continue;
2280         }
2281         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2282         /* create the action */
2283         key = pcmk__op_key(rsc->id, task, interval_ms);
2284         pe_rsc_trace(rsc, "Creating %s on %s", key, pe__node_name(node));
2285         custom_action(rsc, key, task, node, TRUE, TRUE, data_set);
2286     }
2287 }
2288 
2289 void
2290 calculate_active_ops(GList *sorted_op_list, int *start_index, int *stop_index)
     /* [previous][next][first][last][top][bottom][index][help] */
2291 {
2292     int counter = -1;
2293     int implied_monitor_start = -1;
2294     int implied_clone_start = -1;
2295     const char *task = NULL;
2296     const char *status = NULL;
2297     GList *gIter = sorted_op_list;
2298 
2299     *stop_index = -1;
2300     *start_index = -1;
2301 
2302     for (; gIter != NULL; gIter = gIter->next) {
2303         xmlNode *rsc_op = (xmlNode *) gIter->data;
2304 
2305         counter++;
2306 
2307         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2308         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2309 
2310         if (pcmk__str_eq(task, CRMD_ACTION_STOP, pcmk__str_casei)
2311             && pcmk__str_eq(status, "0", pcmk__str_casei)) {
2312             *stop_index = counter;
2313 
2314         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_START, CRMD_ACTION_MIGRATED, NULL)) {
2315             *start_index = counter;
2316 
2317         } else if ((implied_monitor_start <= *stop_index) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
2318             const char *rc = crm_element_value(rsc_op, XML_LRM_ATTR_RC);
2319 
2320             if (pcmk__strcase_any_of(rc, "0", "8", NULL)) {
2321                 implied_monitor_start = counter;
2322             }
2323         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_PROMOTE, CRMD_ACTION_DEMOTE, NULL)) {
2324             implied_clone_start = counter;
2325         }
2326     }
2327 
2328     if (*start_index == -1) {
2329         if (implied_clone_start != -1) {
2330             *start_index = implied_clone_start;
2331         } else if (implied_monitor_start != -1) {
2332             *start_index = implied_monitor_start;
2333         }
2334     }
2335 }
2336 
2337 // If resource history entry has shutdown lock, remember lock node and time
2338 static void
2339 unpack_shutdown_lock(xmlNode *rsc_entry, pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2340                      pe_working_set_t *data_set)
2341 {
2342     time_t lock_time = 0;   // When lock started (i.e. node shutdown time)
2343 
2344     if ((crm_element_value_epoch(rsc_entry, XML_CONFIG_ATTR_SHUTDOWN_LOCK,
2345                                  &lock_time) == pcmk_ok) && (lock_time != 0)) {
2346 
2347         if ((data_set->shutdown_lock > 0)
2348             && (get_effective_time(data_set)
2349                 > (lock_time + data_set->shutdown_lock))) {
2350             pe_rsc_info(rsc, "Shutdown lock for %s on %s expired",
2351                         rsc->id, pe__node_name(node));
2352             pe__clear_resource_history(rsc, node, data_set);
2353         } else {
2354             rsc->lock_node = node;
2355             rsc->lock_time = lock_time;
2356         }
2357     }
2358 }
2359 
2360 /*!
2361  * \internal
2362  * \brief Unpack one lrm_resource entry from a node's CIB status
2363  *
2364  * \param[in] node       Node whose status is being unpacked
2365  * \param[in] rsc_entry  lrm_resource XML being unpacked
2366  * \param[in] data_set   Cluster working set
2367  *
2368  * \return Resource corresponding to the entry, or NULL if no operation history
2369  */
2370 static pe_resource_t *
2371 unpack_lrm_resource(pe_node_t *node, xmlNode *lrm_resource,
     /* [previous][next][first][last][top][bottom][index][help] */
2372                     pe_working_set_t *data_set)
2373 {
2374     GList *gIter = NULL;
2375     int stop_index = -1;
2376     int start_index = -1;
2377     enum rsc_role_e req_role = RSC_ROLE_UNKNOWN;
2378 
2379     const char *task = NULL;
2380     const char *rsc_id = ID(lrm_resource);
2381 
2382     pe_resource_t *rsc = NULL;
2383     GList *op_list = NULL;
2384     GList *sorted_op_list = NULL;
2385 
2386     xmlNode *migrate_op = NULL;
2387     xmlNode *rsc_op = NULL;
2388     xmlNode *last_failure = NULL;
2389 
2390     enum action_fail_response on_fail = action_fail_ignore;
2391     enum rsc_role_e saved_role = RSC_ROLE_UNKNOWN;
2392 
2393     if (rsc_id == NULL) {
2394         crm_warn("Ignoring malformed " XML_LRM_TAG_RESOURCE
2395                  " entry without id");
2396         return NULL;
2397     }
2398     crm_trace("Unpacking " XML_LRM_TAG_RESOURCE " for %s on %s",
2399               rsc_id, pe__node_name(node));
2400 
2401     // Build a list of individual lrm_rsc_op entries, so we can sort them
2402     for (rsc_op = first_named_child(lrm_resource, XML_LRM_TAG_RSC_OP);
2403          rsc_op != NULL; rsc_op = crm_next_same_xml(rsc_op)) {
2404 
2405         op_list = g_list_prepend(op_list, rsc_op);
2406     }
2407 
2408     if (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2409         if (op_list == NULL) {
2410             // If there are no operations, there is nothing to do
2411             return NULL;
2412         }
2413     }
2414 
2415     /* find the resource */
2416     rsc = unpack_find_resource(data_set, node, rsc_id, lrm_resource);
2417     if (rsc == NULL) {
2418         if (op_list == NULL) {
2419             // If there are no operations, there is nothing to do
2420             return NULL;
2421         } else {
2422             rsc = process_orphan_resource(lrm_resource, node, data_set);
2423         }
2424     }
2425     CRM_ASSERT(rsc != NULL);
2426 
2427     // Check whether the resource is "shutdown-locked" to this node
2428     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2429         unpack_shutdown_lock(lrm_resource, rsc, node, data_set);
2430     }
2431 
2432     /* process operations */
2433     saved_role = rsc->role;
2434     rsc->role = RSC_ROLE_UNKNOWN;
2435     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
2436 
2437     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
2438         xmlNode *rsc_op = (xmlNode *) gIter->data;
2439 
2440         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2441         if (pcmk__str_eq(task, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
2442             migrate_op = rsc_op;
2443         }
2444 
2445         unpack_rsc_op(rsc, node, rsc_op, &last_failure, &on_fail, data_set);
2446     }
2447 
2448     /* create active recurring operations as optional */
2449     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
2450     process_recurring(node, rsc, start_index, stop_index, sorted_op_list, data_set);
2451 
2452     /* no need to free the contents */
2453     g_list_free(sorted_op_list);
2454 
2455     process_rsc_state(rsc, node, on_fail, migrate_op, data_set);
2456 
2457     if (get_target_role(rsc, &req_role)) {
2458         if (rsc->next_role == RSC_ROLE_UNKNOWN || req_role < rsc->next_role) {
2459             pe__set_next_role(rsc, req_role, XML_RSC_ATTR_TARGET_ROLE);
2460 
2461         } else if (req_role > rsc->next_role) {
2462             pe_rsc_info(rsc, "%s: Not overwriting calculated next role %s"
2463                         " with requested next role %s",
2464                         rsc->id, role2text(rsc->next_role), role2text(req_role));
2465         }
2466     }
2467 
2468     if (saved_role > rsc->role) {
2469         rsc->role = saved_role;
2470     }
2471 
2472     return rsc;
2473 }
2474 
2475 static void
2476 handle_orphaned_container_fillers(xmlNode * lrm_rsc_list, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2477 {
2478     xmlNode *rsc_entry = NULL;
2479     for (rsc_entry = pcmk__xe_first_child(lrm_rsc_list); rsc_entry != NULL;
2480          rsc_entry = pcmk__xe_next(rsc_entry)) {
2481 
2482         pe_resource_t *rsc;
2483         pe_resource_t *container;
2484         const char *rsc_id;
2485         const char *container_id;
2486 
2487         if (!pcmk__str_eq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE, pcmk__str_casei)) {
2488             continue;
2489         }
2490 
2491         container_id = crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER);
2492         rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
2493         if (container_id == NULL || rsc_id == NULL) {
2494             continue;
2495         }
2496 
2497         container = pe_find_resource(data_set->resources, container_id);
2498         if (container == NULL) {
2499             continue;
2500         }
2501 
2502         rsc = pe_find_resource(data_set->resources, rsc_id);
2503         if (rsc == NULL ||
2504             !pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler) ||
2505             rsc->container != NULL) {
2506             continue;
2507         }
2508 
2509         pe_rsc_trace(rsc, "Mapped container of orphaned resource %s to %s",
2510                      rsc->id, container_id);
2511         rsc->container = container;
2512         container->fillers = g_list_append(container->fillers, rsc);
2513     }
2514 }
2515 
2516 /*!
2517  * \internal
2518  * \brief Unpack one node's lrm status section
2519  *
2520  * \param[in] node      Node whose status is being unpacked
2521  * \param[in] xml       CIB node state XML
2522  * \param[in] data_set  Cluster working set
2523  */
2524 static void
2525 unpack_node_lrm(pe_node_t *node, xmlNode *xml, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2526 {
2527     bool found_orphaned_container_filler = false;
2528 
2529     // Drill down to lrm_resources section
2530     xml = find_xml_node(xml, XML_CIB_TAG_LRM, FALSE);
2531     if (xml == NULL) {
2532         return;
2533     }
2534     xml = find_xml_node(xml, XML_LRM_TAG_RESOURCES, FALSE);
2535     if (xml == NULL) {
2536         return;
2537     }
2538 
2539     // Unpack each lrm_resource entry
2540     for (xmlNode *rsc_entry = first_named_child(xml, XML_LRM_TAG_RESOURCE);
2541          rsc_entry != NULL; rsc_entry = crm_next_same_xml(rsc_entry)) {
2542 
2543         pe_resource_t *rsc = unpack_lrm_resource(node, rsc_entry, data_set);
2544 
2545         if ((rsc != NULL)
2546             && pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler)) {
2547             found_orphaned_container_filler = true;
2548         }
2549     }
2550 
2551     /* Now that all resource state has been unpacked for this node, map any
2552      * orphaned container fillers to their container resource.
2553      */
2554     if (found_orphaned_container_filler) {
2555         handle_orphaned_container_fillers(xml, data_set);
2556     }
2557 }
2558 
2559 static void
2560 set_active(pe_resource_t * rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
2561 {
2562     pe_resource_t *top = uber_parent(rsc);
2563 
2564     if (top && pcmk_is_set(top->flags, pe_rsc_promotable)) {
2565         rsc->role = RSC_ROLE_UNPROMOTED;
2566     } else {
2567         rsc->role = RSC_ROLE_STARTED;
2568     }
2569 }
2570 
2571 static void
2572 set_node_score(gpointer key, gpointer value, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
2573 {
2574     pe_node_t *node = value;
2575     int *score = user_data;
2576 
2577     node->weight = *score;
2578 }
2579 
2580 static xmlNode *
2581 find_lrm_op(const char *resource, const char *op, const char *node, const char *source,
     /* [previous][next][first][last][top][bottom][index][help] */
2582             int target_rc, pe_working_set_t *data_set)
2583 {
2584     GString *xpath = NULL;
2585     xmlNode *xml = NULL;
2586 
2587     CRM_CHECK((resource != NULL) && (op != NULL) && (node != NULL),
2588               return NULL);
2589 
2590     xpath = g_string_sized_new(256);
2591     pcmk__g_strcat(xpath,
2592                    "//" XML_CIB_TAG_STATE "[@" XML_ATTR_UNAME "='", node, "']"
2593                    "//" XML_LRM_TAG_RESOURCE
2594                    "[@" XML_ATTR_ID "='", resource, "']"
2595                    "/" XML_LRM_TAG_RSC_OP "[@" XML_LRM_ATTR_TASK "='", op, "'",
2596                    NULL);
2597 
2598     /* Need to check against transition_magic too? */
2599     if ((source != NULL) && (strcmp(op, CRMD_ACTION_MIGRATE) == 0)) {
2600         pcmk__g_strcat(xpath,
2601                        " and @" XML_LRM_ATTR_MIGRATE_TARGET "='", source, "']",
2602                        NULL);
2603 
2604     } else if ((source != NULL) && (strcmp(op, CRMD_ACTION_MIGRATED) == 0)) {
2605         pcmk__g_strcat(xpath,
2606                        " and @" XML_LRM_ATTR_MIGRATE_SOURCE "='", source, "']",
2607                        NULL);
2608     } else {
2609         g_string_append_c(xpath, ']');
2610     }
2611 
2612     xml = get_xpath_object((const char *) xpath->str, data_set->input,
2613                            LOG_DEBUG);
2614     g_string_free(xpath, TRUE);
2615 
2616     if (xml && target_rc >= 0) {
2617         int rc = PCMK_OCF_UNKNOWN_ERROR;
2618         int status = PCMK_EXEC_ERROR;
2619 
2620         crm_element_value_int(xml, XML_LRM_ATTR_RC, &rc);
2621         crm_element_value_int(xml, XML_LRM_ATTR_OPSTATUS, &status);
2622         if ((rc != target_rc) || (status != PCMK_EXEC_DONE)) {
2623             return NULL;
2624         }
2625     }
2626     return xml;
2627 }
2628 
2629 static xmlNode *
2630 find_lrm_resource(const char *rsc_id, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2631                   pe_working_set_t *data_set)
2632 {
2633     GString *xpath = NULL;
2634     xmlNode *xml = NULL;
2635 
2636     CRM_CHECK((rsc_id != NULL) && (node_name != NULL), return NULL);
2637 
2638     xpath = g_string_sized_new(256);
2639     pcmk__g_strcat(xpath,
2640                    "//" XML_CIB_TAG_STATE
2641                    "[@" XML_ATTR_UNAME "='", node_name, "']"
2642                    "//" XML_LRM_TAG_RESOURCE
2643                    "[@" XML_ATTR_ID "='", rsc_id, "']",
2644                    NULL);
2645 
2646     xml = get_xpath_object((const char *) xpath->str, data_set->input,
2647                            LOG_DEBUG);
2648 
2649     g_string_free(xpath, TRUE);
2650     return xml;
2651 }
2652 
2653 static bool
2654 unknown_on_node(const char *rsc_id, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2655                 pe_working_set_t *data_set)
2656 {
2657     xmlNode *lrm_resource = NULL;
2658 
2659     lrm_resource = find_lrm_resource(rsc_id, node_name, data_set);
2660 
2661     /* If the resource has no lrm_rsc_op history on the node, that means its
2662      * state is unknown there.
2663      */
2664     return (lrm_resource == NULL
2665             || first_named_child(lrm_resource, XML_LRM_TAG_RSC_OP) == NULL);
2666 }
2667 
2668 /*!
2669  * \brief Check whether a probe/monitor indicating the resource was not running
2670  * on a node happened after some event
2671  *
2672  * \param[in] rsc_id    Resource being checked
2673  * \param[in] node_name Node being checked
2674  * \param[in] xml_op    Event that monitor is being compared to
2675  * \param[in] data_set  Cluster working set
2676  *
2677  * \return true if such a monitor happened after event, false otherwise
2678  */
2679 static bool
2680 monitor_not_running_after(const char *rsc_id, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2681                           xmlNode *xml_op, bool same_node,
2682                           pe_working_set_t *data_set)
2683 {
2684     /* Any probe/monitor operation on the node indicating it was not running
2685      * there
2686      */
2687     xmlNode *monitor = find_lrm_op(rsc_id, CRMD_ACTION_STATUS, node_name,
2688                                    NULL, PCMK_OCF_NOT_RUNNING, data_set);
2689 
2690     return (monitor && pe__is_newer_op(monitor, xml_op, same_node) > 0);
2691 }
2692 
2693 /*!
2694  * \brief Check whether any non-monitor operation on a node happened after some
2695  * event
2696  *
2697  * \param[in] rsc_id    Resource being checked
2698  * \param[in] node_name Node being checked
2699  * \param[in] xml_op    Event that non-monitor is being compared to
2700  * \param[in] same_node Whether the operations are on the same node
2701  * \param[in] data_set  Cluster working set
2702  *
2703  * \return true if such a operation happened after event, false otherwise
2704  */
2705 static bool
2706 non_monitor_after(const char *rsc_id, const char *node_name, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2707                   bool same_node, pe_working_set_t *data_set)
2708 {
2709     xmlNode *lrm_resource = NULL;
2710 
2711     lrm_resource = find_lrm_resource(rsc_id, node_name, data_set);
2712     if (lrm_resource == NULL) {
2713         return false;
2714     }
2715 
2716     for (xmlNode *op = first_named_child(lrm_resource, XML_LRM_TAG_RSC_OP);
2717          op != NULL; op = crm_next_same_xml(op)) {
2718         const char * task = NULL;
2719 
2720         if (op == xml_op) {
2721             continue;
2722         }
2723 
2724         task = crm_element_value(op, XML_LRM_ATTR_TASK);
2725 
2726         if (pcmk__str_any_of(task, CRMD_ACTION_START, CRMD_ACTION_STOP,
2727                              CRMD_ACTION_MIGRATE, CRMD_ACTION_MIGRATED, NULL)
2728             && pe__is_newer_op(op, xml_op, same_node) > 0) {
2729             return true;
2730         }
2731     }
2732 
2733     return false;
2734 }
2735 
2736 /*!
2737  * \brief Check whether the resource has newer state on a node after a migration
2738  * attempt
2739  *
2740  * \param[in] rsc_id       Resource being checked
2741  * \param[in] node_name    Node being checked
2742  * \param[in] migrate_to   Any migrate_to event that is being compared to
2743  * \param[in] migrate_from Any migrate_from event that is being compared to
2744  * \param[in] data_set     Cluster working set
2745  *
2746  * \return true if such a operation happened after event, false otherwise
2747  */
2748 static bool
2749 newer_state_after_migrate(const char *rsc_id, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2750                           xmlNode *migrate_to, xmlNode *migrate_from,
2751                           pe_working_set_t *data_set)
2752 {
2753     xmlNode *xml_op = migrate_to;
2754     const char *source = NULL;
2755     const char *target = NULL;
2756     bool same_node = false;
2757 
2758     if (migrate_from) {
2759         xml_op = migrate_from;
2760     }
2761 
2762     source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2763     target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2764 
2765     /* It's preferred to compare to the migrate event on the same node if
2766      * existing, since call ids are more reliable.
2767      */
2768     if (pcmk__str_eq(node_name, target, pcmk__str_casei)) {
2769         if (migrate_from) {
2770            xml_op = migrate_from;
2771            same_node = true;
2772 
2773         } else {
2774            xml_op = migrate_to;
2775         }
2776 
2777     } else if (pcmk__str_eq(node_name, source, pcmk__str_casei)) {
2778         if (migrate_to) {
2779            xml_op = migrate_to;
2780            same_node = true;
2781 
2782         } else {
2783            xml_op = migrate_from;
2784         }
2785     }
2786 
2787     /* If there's any newer non-monitor operation on the node, or any newer
2788      * probe/monitor operation on the node indicating it was not running there,
2789      * the migration events potentially no longer matter for the node.
2790      */
2791     return non_monitor_after(rsc_id, node_name, xml_op, same_node, data_set)
2792            || monitor_not_running_after(rsc_id, node_name, xml_op, same_node,
2793                                         data_set);
2794 }
2795 
2796 static void
2797 unpack_migrate_to_success(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2798                           pe_working_set_t *data_set)
2799 {
2800     /* A successful migration sequence is:
2801      *    migrate_to on source node
2802      *    migrate_from on target node
2803      *    stop on source node
2804      *
2805      * But there could be scenarios like (It's easier to produce with cluster
2806      * property batch-limit=1):
2807      *
2808      * - rscA is live-migrating from node1 to node2.
2809      *
2810      * - Before migrate_to on node1 returns, put node2 into standby.
2811      *
2812      * - Transition aborts upon return of successful migrate_to on node1. New
2813      *   transition is going to stop the rscA on both nodes and start it on
2814      *   node1.
2815      *
2816      * - While it is stopping on node1, run something that is going to make
2817      *   the transition abort again like:
2818      *   crm_resource  --resource rscA --ban --node node2
2819      *
2820      * - Transition aborts upon return of stop on node1.
2821      *
2822      * Now although there's a stop on node1, it's still a partial migration and
2823      * rscA is still potentially active on node2.
2824      *
2825      * So even if a migrate_to is followed by a stop, we still need to check
2826      * whether there's a corresponding migrate_from or any newer operation on
2827      * the target.
2828      *
2829      * If no migrate_from has happened, the migration is considered to be
2830      * "partial". If the migrate_from failed, make sure the resource gets
2831      * stopped on both source and target (if up).
2832      *
2833      * If the migrate_to and migrate_from both succeeded (which also implies the
2834      * resource is no longer running on the source), but there is no stop, the
2835      * migration is considered to be "dangling". Schedule a stop on the source
2836      * in this case.
2837      */
2838     int from_rc = 0;
2839     int from_status = 0;
2840     pe_node_t *target_node = NULL;
2841     pe_node_t *source_node = NULL;
2842     xmlNode *migrate_from = NULL;
2843     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2844     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2845     bool source_newer_op = false;
2846     bool target_newer_state = false;
2847 
2848     // Sanity check
2849     CRM_CHECK(source && target && !strcmp(source, node->details->uname), return);
2850 
2851     /* If there's any newer non-monitor operation on the source, this migrate_to
2852      * potentially no longer matters for the source.
2853      */
2854     source_newer_op = non_monitor_after(rsc->id, source, xml_op, true,
2855                                         data_set);
2856 
2857     // Check whether there was a migrate_from action on the target
2858     migrate_from = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, target,
2859                                source, -1, data_set);
2860 
2861     /* Even if there's a newer non-monitor operation on the source, we still
2862      * need to check how this migrate_to might matter for the target.
2863      */
2864     if (source_newer_op && migrate_from) {
2865         return;
2866     }
2867 
2868     /* If the resource has newer state on the target after the migration
2869      * events, this migrate_to no longer matters for the target.
2870      */
2871     target_newer_state = newer_state_after_migrate(rsc->id, target, xml_op,
2872                                                    migrate_from, data_set);
2873 
2874     if (source_newer_op && target_newer_state) {
2875         return;
2876     }
2877 
2878     // Clones are not allowed to migrate, so role can't be promoted
2879     rsc->role = RSC_ROLE_STARTED;
2880 
2881     target_node = pe_find_node(data_set->nodes, target);
2882     source_node = pe_find_node(data_set->nodes, source);
2883 
2884     if (migrate_from) {
2885         crm_element_value_int(migrate_from, XML_LRM_ATTR_RC, &from_rc);
2886         crm_element_value_int(migrate_from, XML_LRM_ATTR_OPSTATUS, &from_status);
2887         pe_rsc_trace(rsc, "%s op on %s exited with status=%d, rc=%d",
2888                      ID(migrate_from), target, from_status, from_rc);
2889     }
2890 
2891     if (migrate_from && from_rc == PCMK_OCF_OK
2892         && (from_status == PCMK_EXEC_DONE)) {
2893         /* The migrate_to and migrate_from both succeeded, so mark the migration
2894          * as "dangling". This will be used to schedule a stop action on the
2895          * source without affecting the target.
2896          */
2897         pe_rsc_trace(rsc, "Detected dangling migration op: %s on %s", ID(xml_op),
2898                      source);
2899         rsc->role = RSC_ROLE_STOPPED;
2900         rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
2901 
2902     } else if (migrate_from && (from_status != PCMK_EXEC_PENDING)) { // Failed
2903         /* If the resource has newer state on the target, this migrate_to no
2904          * longer matters for the target.
2905          */
2906         if (!target_newer_state
2907             && target_node && target_node->details->online) {
2908             pe_rsc_trace(rsc, "Marking active on %s %p %d", target, target_node,
2909                          target_node->details->online);
2910             native_add_running(rsc, target_node, data_set, TRUE);
2911 
2912         } else {
2913             /* With the earlier bail logic, migrate_from != NULL here implies
2914              * source_newer_op is false, meaning this migrate_to still matters
2915              * for the source.
2916              * Consider it failed here - forces a restart, prevents migration
2917              */
2918             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2919             pe__clear_resource_flags(rsc, pe_rsc_allow_migrate);
2920         }
2921 
2922     } else { // Pending, or complete but erased
2923         /* If the resource has newer state on the target, this migrate_to no
2924          * longer matters for the target.
2925          */
2926         if (!target_newer_state
2927             && target_node && target_node->details->online) {
2928             pe_rsc_trace(rsc, "Marking active on %s %p %d", target, target_node,
2929                          target_node->details->online);
2930 
2931             native_add_running(rsc, target_node, data_set, FALSE);
2932             if (source_node && source_node->details->online) {
2933                 /* This is a partial migration: the migrate_to completed
2934                  * successfully on the source, but the migrate_from has not
2935                  * completed. Remember the source and target; if the newly
2936                  * chosen target remains the same when we schedule actions
2937                  * later, we may continue with the migration.
2938                  */
2939                 rsc->partial_migration_target = target_node;
2940                 rsc->partial_migration_source = source_node;
2941             }
2942         } else if (!source_newer_op) {
2943             /* This migrate_to matters for the source only if it's the last
2944              * non-monitor operation here.
2945              * Consider it failed here - forces a restart, prevents migration
2946              */
2947             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2948             pe__clear_resource_flags(rsc, pe_rsc_allow_migrate);
2949         }
2950     }
2951 }
2952 
2953 static void
2954 unpack_migrate_to_failure(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2955                           pe_working_set_t *data_set)
2956 {
2957     xmlNode *target_migrate_from = NULL;
2958     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2959     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2960 
2961     // Sanity check
2962     CRM_CHECK(source && target && !strcmp(source, node->details->uname), return);
2963 
2964     /* If a migration failed, we have to assume the resource is active. Clones
2965      * are not allowed to migrate, so role can't be promoted.
2966      */
2967     rsc->role = RSC_ROLE_STARTED;
2968 
2969     // Check for migrate_from on the target
2970     target_migrate_from = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, target,
2971                                       source, PCMK_OCF_OK, data_set);
2972 
2973     if (/* If the resource state is unknown on the target, it will likely be
2974          * probed there.
2975          * Don't just consider it running there. We will get back here anyway in
2976          * case the probe detects it's running there.
2977          */
2978         !unknown_on_node(rsc->id, target, data_set)
2979         /* If the resource has newer state on the target after the migration
2980          * events, this migrate_to no longer matters for the target.
2981          */
2982         && !newer_state_after_migrate(rsc->id, target, xml_op, target_migrate_from,
2983                                       data_set)) {
2984         /* The resource has no newer state on the target, so assume it's still
2985          * active there.
2986          * (if it is up).
2987          */
2988         pe_node_t *target_node = pe_find_node(data_set->nodes, target);
2989 
2990         if (target_node && target_node->details->online) {
2991             native_add_running(rsc, target_node, data_set, FALSE);
2992         }
2993 
2994     } else if (!non_monitor_after(rsc->id, source, xml_op, true, data_set)) {
2995         /* We know the resource has newer state on the target, but this
2996          * migrate_to still matters for the source as long as there's no newer
2997          * non-monitor operation there.
2998          */
2999 
3000         // Mark node as having dangling migration so we can force a stop later
3001         rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
3002     }
3003 }
3004 
3005 static void
3006 unpack_migrate_from_failure(pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
3007                             xmlNode *xml_op, pe_working_set_t *data_set)
3008 {
3009     xmlNode *source_migrate_to = NULL;
3010     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
3011     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
3012 
3013     // Sanity check
3014     CRM_CHECK(source && target && !strcmp(target, node->details->uname), return);
3015 
3016     /* If a migration failed, we have to assume the resource is active. Clones
3017      * are not allowed to migrate, so role can't be promoted.
3018      */
3019     rsc->role = RSC_ROLE_STARTED;
3020 
3021     // Check for a migrate_to on the source
3022     source_migrate_to = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATE,
3023                                     source, target, PCMK_OCF_OK, data_set);
3024 
3025     if (/* If the resource state is unknown on the source, it will likely be
3026          * probed there.
3027          * Don't just consider it running there. We will get back here anyway in
3028          * case the probe detects it's running there.
3029          */
3030         !unknown_on_node(rsc->id, source, data_set)
3031         /* If the resource has newer state on the source after the migration
3032          * events, this migrate_from no longer matters for the source.
3033          */
3034         && !newer_state_after_migrate(rsc->id, source, source_migrate_to, xml_op,
3035                                       data_set)) {
3036         /* The resource has no newer state on the source, so assume it's still
3037          * active there (if it is up).
3038          */
3039         pe_node_t *source_node = pe_find_node(data_set->nodes, source);
3040 
3041         if (source_node && source_node->details->online) {
3042             native_add_running(rsc, source_node, data_set, TRUE);
3043         }
3044     }
3045 }
3046 
3047 static void
3048 record_failed_op(xmlNode *op, const pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
3049                  const pe_resource_t *rsc, pe_working_set_t *data_set)
3050 {
3051     xmlNode *xIter = NULL;
3052     const char *op_key = crm_element_value(op, XML_LRM_ATTR_TASK_KEY);
3053 
3054     if (node->details->online == FALSE) {
3055         return;
3056     }
3057 
3058     for (xIter = data_set->failed->children; xIter; xIter = xIter->next) {
3059         const char *key = crm_element_value(xIter, XML_LRM_ATTR_TASK_KEY);
3060         const char *uname = crm_element_value(xIter, XML_ATTR_UNAME);
3061 
3062         if(pcmk__str_eq(op_key, key, pcmk__str_casei) && pcmk__str_eq(uname, node->details->uname, pcmk__str_casei)) {
3063             crm_trace("Skipping duplicate entry %s on %s",
3064                       op_key, pe__node_name(node));
3065             return;
3066         }
3067     }
3068 
3069     crm_trace("Adding entry %s on %s", op_key, pe__node_name(node));
3070     crm_xml_add(op, XML_ATTR_UNAME, node->details->uname);
3071     crm_xml_add(op, XML_LRM_ATTR_RSCID, rsc->id);
3072     add_node_copy(data_set->failed, op);
3073 }
3074 
3075 static const char *get_op_key(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
3076 {
3077     const char *key = crm_element_value(xml_op, XML_LRM_ATTR_TASK_KEY);
3078     if(key == NULL) {
3079         key = ID(xml_op);
3080     }
3081     return key;
3082 }
3083 
3084 static const char *
3085 last_change_str(const xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
3086 {
3087     time_t when;
3088     const char *when_s = NULL;
3089 
3090     if (crm_element_value_epoch(xml_op, XML_RSC_OP_LAST_CHANGE,
3091                                 &when) == pcmk_ok) {
3092         when_s = pcmk__epoch2str(&when);
3093         if (when_s) {
3094             // Skip day of week to make message shorter
3095             when_s = strchr(when_s, ' ');
3096             if (when_s) {
3097                 ++when_s;
3098             }
3099         }
3100     }
3101     return ((when_s && *when_s)? when_s : "unknown time");
3102 }
3103 
3104 /*!
3105  * \internal
3106  * \brief Compare two on-fail values
3107  *
3108  * \param[in] first   One on-fail value to compare
3109  * \param[in] second  The other on-fail value to compare
3110  *
3111  * \return A negative number if second is more severe than first, zero if they
3112  *         are equal, or a positive number if first is more severe than second.
3113  * \note This is only needed until the action_fail_response values can be
3114  *       renumbered at the next API compatibility break.
3115  */
3116 static int
3117 cmp_on_fail(enum action_fail_response first, enum action_fail_response second)
     /* [previous][next][first][last][top][bottom][index][help] */
3118 {
3119     switch (first) {
3120         case action_fail_demote:
3121             switch (second) {
3122                 case action_fail_ignore:
3123                     return 1;
3124                 case action_fail_demote:
3125                     return 0;
3126                 default:
3127                     return -1;
3128             }
3129             break;
3130 
3131         case action_fail_reset_remote:
3132             switch (second) {
3133                 case action_fail_ignore:
3134                 case action_fail_demote:
3135                 case action_fail_recover:
3136                     return 1;
3137                 case action_fail_reset_remote:
3138                     return 0;
3139                 default:
3140                     return -1;
3141             }
3142             break;
3143 
3144         case action_fail_restart_container:
3145             switch (second) {
3146                 case action_fail_ignore:
3147                 case action_fail_demote:
3148                 case action_fail_recover:
3149                 case action_fail_reset_remote:
3150                     return 1;
3151                 case action_fail_restart_container:
3152                     return 0;
3153                 default:
3154                     return -1;
3155             }
3156             break;
3157 
3158         default:
3159             break;
3160     }
3161     switch (second) {
3162         case action_fail_demote:
3163             return (first == action_fail_ignore)? -1 : 1;
3164 
3165         case action_fail_reset_remote:
3166             switch (first) {
3167                 case action_fail_ignore:
3168                 case action_fail_demote:
3169                 case action_fail_recover:
3170                     return -1;
3171                 default:
3172                     return 1;
3173             }
3174             break;
3175 
3176         case action_fail_restart_container:
3177             switch (first) {
3178                 case action_fail_ignore:
3179                 case action_fail_demote:
3180                 case action_fail_recover:
3181                 case action_fail_reset_remote:
3182                     return -1;
3183                 default:
3184                     return 1;
3185             }
3186             break;
3187 
3188         default:
3189             break;
3190     }
3191     return first - second;
3192 }
3193 
3194 static void
3195 unpack_rsc_op_failure(pe_resource_t * rsc, pe_node_t * node, int rc, xmlNode * xml_op, xmlNode ** last_failure,
     /* [previous][next][first][last][top][bottom][index][help] */
3196                       enum action_fail_response * on_fail, pe_working_set_t * data_set)
3197 {
3198     bool is_probe = false;
3199     pe_action_t *action = NULL;
3200 
3201     const char *key = get_op_key(xml_op);
3202     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3203     const char *exit_reason = crm_element_value(xml_op,
3204                                                 XML_LRM_ATTR_EXIT_REASON);
3205 
3206     CRM_ASSERT(rsc);
3207     CRM_CHECK(task != NULL, return);
3208 
3209     *last_failure = xml_op;
3210 
3211     is_probe = pcmk_xe_is_probe(xml_op);
3212 
3213     if (exit_reason == NULL) {
3214         exit_reason = "";
3215     }
3216 
3217     if (!pcmk_is_set(data_set->flags, pe_flag_symmetric_cluster)
3218         && (rc == PCMK_OCF_NOT_INSTALLED)) {
3219         crm_trace("Unexpected result (%s%s%s) was recorded for "
3220                   "%s of %s on %s at %s " CRM_XS " rc=%d id=%s",
3221                   services_ocf_exitcode_str(rc),
3222                   (*exit_reason? ": " : ""), exit_reason,
3223                   (is_probe? "probe" : task), rsc->id, pe__node_name(node),
3224                   last_change_str(xml_op), rc, ID(xml_op));
3225     } else {
3226         crm_warn("Unexpected result (%s%s%s) was recorded for "
3227                   "%s of %s on %s at %s " CRM_XS " rc=%d id=%s",
3228                  services_ocf_exitcode_str(rc),
3229                  (*exit_reason? ": " : ""), exit_reason,
3230                  (is_probe? "probe" : task), rsc->id, pe__node_name(node),
3231                  last_change_str(xml_op), rc, ID(xml_op));
3232 
3233         if (is_probe && (rc != PCMK_OCF_OK)
3234             && (rc != PCMK_OCF_NOT_RUNNING)
3235             && (rc != PCMK_OCF_RUNNING_PROMOTED)) {
3236 
3237             /* A failed (not just unexpected) probe result could mean the user
3238              * didn't know resources will be probed even where they can't run.
3239              */
3240             crm_notice("If it is not possible for %s to run on %s, see "
3241                        "the resource-discovery option for location constraints",
3242                        rsc->id, pe__node_name(node));
3243         }
3244 
3245         record_failed_op(xml_op, node, rsc, data_set);
3246     }
3247 
3248     action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
3249     if (cmp_on_fail(*on_fail, action->on_fail) < 0) {
3250         pe_rsc_trace(rsc, "on-fail %s -> %s for %s (%s)", fail2text(*on_fail),
3251                      fail2text(action->on_fail), action->uuid, key);
3252         *on_fail = action->on_fail;
3253     }
3254 
3255     if (!strcmp(task, CRMD_ACTION_STOP)) {
3256         resource_location(rsc, node, -INFINITY, "__stop_fail__", data_set);
3257 
3258     } else if (!strcmp(task, CRMD_ACTION_MIGRATE)) {
3259         unpack_migrate_to_failure(rsc, node, xml_op, data_set);
3260 
3261     } else if (!strcmp(task, CRMD_ACTION_MIGRATED)) {
3262         unpack_migrate_from_failure(rsc, node, xml_op, data_set);
3263 
3264     } else if (!strcmp(task, CRMD_ACTION_PROMOTE)) {
3265         rsc->role = RSC_ROLE_PROMOTED;
3266 
3267     } else if (!strcmp(task, CRMD_ACTION_DEMOTE)) {
3268         if (action->on_fail == action_fail_block) {
3269             rsc->role = RSC_ROLE_PROMOTED;
3270             pe__set_next_role(rsc, RSC_ROLE_STOPPED,
3271                               "demote with on-fail=block");
3272 
3273         } else if(rc == PCMK_OCF_NOT_RUNNING) {
3274             rsc->role = RSC_ROLE_STOPPED;
3275 
3276         } else {
3277             /* Staying in the promoted role would put the scheduler and
3278              * controller into a loop. Setting the role to unpromoted is not
3279              * dangerous because the resource will be stopped as part of
3280              * recovery, and any promotion will be ordered after that stop.
3281              */
3282             rsc->role = RSC_ROLE_UNPROMOTED;
3283         }
3284     }
3285 
3286     if(is_probe && rc == PCMK_OCF_NOT_INSTALLED) {
3287         /* leave stopped */
3288         pe_rsc_trace(rsc, "Leaving %s stopped", rsc->id);
3289         rsc->role = RSC_ROLE_STOPPED;
3290 
3291     } else if (rsc->role < RSC_ROLE_STARTED) {
3292         pe_rsc_trace(rsc, "Setting %s active", rsc->id);
3293         set_active(rsc);
3294     }
3295 
3296     pe_rsc_trace(rsc, "Resource %s: role=%s, unclean=%s, on_fail=%s, fail_role=%s",
3297                  rsc->id, role2text(rsc->role),
3298                  pcmk__btoa(node->details->unclean),
3299                  fail2text(action->on_fail), role2text(action->fail_role));
3300 
3301     if (action->fail_role != RSC_ROLE_STARTED && rsc->next_role < action->fail_role) {
3302         pe__set_next_role(rsc, action->fail_role, "failure");
3303     }
3304 
3305     if (action->fail_role == RSC_ROLE_STOPPED) {
3306         int score = -INFINITY;
3307 
3308         pe_resource_t *fail_rsc = rsc;
3309 
3310         if (fail_rsc->parent) {
3311             pe_resource_t *parent = uber_parent(fail_rsc);
3312 
3313             if (pe_rsc_is_clone(parent)
3314                 && !pcmk_is_set(parent->flags, pe_rsc_unique)) {
3315                 /* For clone resources, if a child fails on an operation
3316                  * with on-fail = stop, all the resources fail.  Do this by preventing
3317                  * the parent from coming up again. */
3318                 fail_rsc = parent;
3319             }
3320         }
3321         crm_notice("%s will not be started under current conditions",
3322                    fail_rsc->id);
3323         /* make sure it doesn't come up again */
3324         if (fail_rsc->allowed_nodes != NULL) {
3325             g_hash_table_destroy(fail_rsc->allowed_nodes);
3326         }
3327         fail_rsc->allowed_nodes = pe__node_list2table(data_set->nodes);
3328         g_hash_table_foreach(fail_rsc->allowed_nodes, set_node_score, &score);
3329     }
3330 
3331     pe_free_action(action);
3332 }
3333 
3334 /*!
3335  * \internal
3336  * \brief Check whether a resource with a failed action can be recovered
3337  *
3338  * If resource action is a failed stop and fencing is not possible, mark the
3339  * resource as unmanaged and blocked, since recovery cannot be done.
3340  *
3341  * \param[in,out] rsc          Resource with failed action
3342  * \param[in]     node         Node where action failed
3343  * \param[in]     task         Name of action that failed
3344  * \param[in]     exit_status  Exit status of failed action (for logging only)
3345  * \param[in]     xml_op       XML of failed action result (for logging only)
3346  */
3347 static void
3348 check_recoverable(pe_resource_t *rsc, pe_node_t *node, const char *task,
     /* [previous][next][first][last][top][bottom][index][help] */
3349                   int exit_status, const xmlNode *xml_op)
3350 {
3351     const char *exit_reason = NULL;
3352 
3353     if (strcmp(task, CRMD_ACTION_STOP) != 0) {
3354         return; // All actions besides stop are always recoverable
3355     }
3356     if (pe_can_fence(node->details->data_set, node)) {
3357         return; // Failed stops are recoverable via fencing
3358     }
3359 
3360     exit_reason = crm_element_value(xml_op, XML_LRM_ATTR_EXIT_REASON);
3361     pe_proc_err("No further recovery can be attempted for %s "
3362                 "because %s on %s failed (%s%s%s) at %s "
3363                 CRM_XS " rc=%d id=%s", rsc->id, task, pe__node_name(node),
3364                 services_ocf_exitcode_str(exit_status),
3365                 ((exit_reason == NULL)? "" : ": "), pcmk__s(exit_reason, ""),
3366                 last_change_str(xml_op), exit_status, ID(xml_op));
3367 
3368     pe__clear_resource_flags(rsc, pe_rsc_managed);
3369     pe__set_resource_flags(rsc, pe_rsc_block);
3370 }
3371 
3372 /*!
3373  * \internal
3374  * \brief Remap informational monitor results and operation status
3375  *
3376  * For the monitor results, certain OCF codes are for providing extended information
3377  * to the user about services that aren't yet failed but not entirely healthy either.
3378  * These must be treated as the "normal" result by Pacemaker.
3379  *
3380  * For operation status, the action result can be used to determine an appropriate
3381  * status for the purposes of responding to the action.  The status provided by the
3382  * executor is not directly usable since the executor does not know what was expected.
3383  *
3384  * \param[in]     xml_op     Operation history entry XML from CIB status
3385  * \param[in,out] rsc        Resource that operation history entry is for
3386  * \param[in]     node       Node where operation was executed
3387  * \param[in]     data_set   Current cluster working set
3388  * \param[in,out] on_fail    What should be done about the result
3389  * \param[in]     target_rc  Expected return code of operation
3390  * \param[in,out] rc         Actual return code of operation
3391  * \param[in,out] status     Operation execution status
3392  *
3393  * \note If the result is remapped and the node is not shutting down or failed,
3394  *       the operation will be recorded in the data set's list of failed operations
3395  *       to highlight it for the user.
3396  *
3397  * \note This may update the resource's current and next role.
3398  */
3399 static void
3400 remap_operation(xmlNode *xml_op, pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
3401                 pe_working_set_t *data_set, enum action_fail_response *on_fail,
3402                 int target_rc, int *rc, int *status) {
3403     bool is_probe = false;
3404     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3405     const char *key = get_op_key(xml_op);
3406     const char *exit_reason = crm_element_value(xml_op,
3407                                                 XML_LRM_ATTR_EXIT_REASON);
3408 
3409     if (pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_none)) {
3410         int remapped_rc = pcmk__effective_rc(*rc);
3411 
3412         if (*rc != remapped_rc) {
3413             crm_trace("Remapping monitor result %d to %d", *rc, remapped_rc);
3414             if (!node->details->shutdown || node->details->online) {
3415                 record_failed_op(xml_op, node, rsc, data_set);
3416             }
3417 
3418             *rc = remapped_rc;
3419         }
3420     }
3421 
3422     if (!pe_rsc_is_bundled(rsc) && pcmk_xe_mask_probe_failure(xml_op)) {
3423         *status = PCMK_EXEC_DONE;
3424         *rc = PCMK_OCF_NOT_RUNNING;
3425     }
3426 
3427     /* If the executor reported an operation status of anything but done or
3428      * error, consider that final. But for done or error, we know better whether
3429      * it should be treated as a failure or not, because we know the expected
3430      * result.
3431      */
3432     if (*status != PCMK_EXEC_DONE && *status != PCMK_EXEC_ERROR) {
3433         return;
3434     }
3435 
3436     CRM_ASSERT(rsc);
3437     CRM_CHECK(task != NULL,
3438               *status = PCMK_EXEC_ERROR; return);
3439 
3440     *status = PCMK_EXEC_DONE;
3441 
3442     if (exit_reason == NULL) {
3443         exit_reason = "";
3444     }
3445 
3446     is_probe = pcmk_xe_is_probe(xml_op);
3447 
3448     if (is_probe) {
3449         task = "probe";
3450     }
3451 
3452     if (target_rc < 0) {
3453         /* Pre-1.0 Pacemaker versions, and Pacemaker 1.1.6 or earlier with
3454          * Heartbeat 2.0.7 or earlier as the cluster layer, did not include the
3455          * target_rc in the transition key, which (along with the similar case
3456          * of a corrupted transition key in the CIB) will be reported to this
3457          * function as -1. Pacemaker 2.0+ does not support rolling upgrades from
3458          * those versions or processing of saved CIB files from those versions,
3459          * so we do not need to care much about this case.
3460          */
3461         *status = PCMK_EXEC_ERROR;
3462         crm_warn("Expected result not found for %s on %s (corrupt or obsolete CIB?)",
3463                  key, pe__node_name(node));
3464 
3465     } else if (target_rc != *rc) {
3466         *status = PCMK_EXEC_ERROR;
3467         pe_rsc_debug(rsc, "%s on %s: expected %d (%s), got %d (%s%s%s)",
3468                      key, pe__node_name(node),
3469                      target_rc, services_ocf_exitcode_str(target_rc),
3470                      *rc, services_ocf_exitcode_str(*rc),
3471                      (*exit_reason? ": " : ""), exit_reason);
3472     }
3473 
3474     switch (*rc) {
3475         case PCMK_OCF_OK:
3476             if (is_probe && (target_rc == PCMK_OCF_NOT_RUNNING)) {
3477                 *status = PCMK_EXEC_DONE;
3478                 pe_rsc_info(rsc, "Probe found %s active on %s at %s",
3479                             rsc->id, pe__node_name(node),
3480                             last_change_str(xml_op));
3481             }
3482             break;
3483 
3484         case PCMK_OCF_NOT_RUNNING:
3485             if (is_probe || (target_rc == *rc)
3486                 || !pcmk_is_set(rsc->flags, pe_rsc_managed)) {
3487 
3488                 *status = PCMK_EXEC_DONE;
3489                 rsc->role = RSC_ROLE_STOPPED;
3490 
3491                 /* clear any previous failure actions */
3492                 *on_fail = action_fail_ignore;
3493                 pe__set_next_role(rsc, RSC_ROLE_UNKNOWN, "not running");
3494             }
3495             break;
3496 
3497         case PCMK_OCF_RUNNING_PROMOTED:
3498             if (is_probe && (*rc != target_rc)) {
3499                 *status = PCMK_EXEC_DONE;
3500                 pe_rsc_info(rsc,
3501                             "Probe found %s active and promoted on %s at %s",
3502                             rsc->id, pe__node_name(node),
3503                             last_change_str(xml_op));
3504             }
3505             rsc->role = RSC_ROLE_PROMOTED;
3506             break;
3507 
3508         case PCMK_OCF_DEGRADED_PROMOTED:
3509         case PCMK_OCF_FAILED_PROMOTED:
3510             rsc->role = RSC_ROLE_PROMOTED;
3511             *status = PCMK_EXEC_ERROR;
3512             break;
3513 
3514         case PCMK_OCF_NOT_CONFIGURED:
3515             *status = PCMK_EXEC_ERROR_FATAL;
3516             break;
3517 
3518         case PCMK_OCF_UNIMPLEMENT_FEATURE:
3519             {
3520                 guint interval_ms = 0;
3521                 crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS,
3522                                      &interval_ms);
3523 
3524                 if (interval_ms == 0) {
3525                     check_recoverable(rsc, node, task, *rc, xml_op);
3526                     *status = PCMK_EXEC_ERROR_HARD;
3527                 } else {
3528                     *status = PCMK_EXEC_NOT_SUPPORTED;
3529                 }
3530             }
3531             break;
3532 
3533         case PCMK_OCF_NOT_INSTALLED:
3534         case PCMK_OCF_INVALID_PARAM:
3535         case PCMK_OCF_INSUFFICIENT_PRIV:
3536             check_recoverable(rsc, node, task, *rc, xml_op);
3537             *status = PCMK_EXEC_ERROR_HARD;
3538             break;
3539 
3540         default:
3541             if (*status == PCMK_EXEC_DONE) {
3542                 crm_info("Treating unknown exit status %d from %s of %s "
3543                          "on %s at %s as failure",
3544                          *rc, task, rsc->id, pe__node_name(node),
3545                          last_change_str(xml_op));
3546                 *status = PCMK_EXEC_ERROR;
3547             }
3548             break;
3549     }
3550 
3551     pe_rsc_trace(rsc, "Remapped %s status to '%s'",
3552                  key, pcmk_exec_status_str(*status));
3553 }
3554 
3555 // return TRUE if start or monitor last failure but parameters changed
3556 static bool
3557 should_clear_for_param_change(xmlNode *xml_op, const char *task,
     /* [previous][next][first][last][top][bottom][index][help] */
3558                               pe_resource_t *rsc, pe_node_t *node,
3559                               pe_working_set_t *data_set)
3560 {
3561     if (!strcmp(task, "start") || !strcmp(task, "monitor")) {
3562 
3563         if (pe__bundle_needs_remote_name(rsc, data_set)) {
3564             /* We haven't allocated resources yet, so we can't reliably
3565              * substitute addr parameters for the REMOTE_CONTAINER_HACK.
3566              * When that's needed, defer the check until later.
3567              */
3568             pe__add_param_check(xml_op, rsc, node, pe_check_last_failure,
3569                                 data_set);
3570 
3571         } else {
3572             op_digest_cache_t *digest_data = NULL;
3573 
3574             digest_data = rsc_action_digest_cmp(rsc, xml_op, node, data_set);
3575             switch (digest_data->rc) {
3576                 case RSC_DIGEST_UNKNOWN:
3577                     crm_trace("Resource %s history entry %s on %s"
3578                               " has no digest to compare",
3579                               rsc->id, get_op_key(xml_op), node->details->id);
3580                     break;
3581                 case RSC_DIGEST_MATCH:
3582                     break;
3583                 default:
3584                     return TRUE;
3585             }
3586         }
3587     }
3588     return FALSE;
3589 }
3590 
3591 // Order action after fencing of remote node, given connection rsc
3592 static void
3593 order_after_remote_fencing(pe_action_t *action, pe_resource_t *remote_conn,
     /* [previous][next][first][last][top][bottom][index][help] */
3594                            pe_working_set_t *data_set)
3595 {
3596     pe_node_t *remote_node = pe_find_node(data_set->nodes, remote_conn->id);
3597 
3598     if (remote_node) {
3599         pe_action_t *fence = pe_fence_op(remote_node, NULL, TRUE, NULL,
3600                                          FALSE, data_set);
3601 
3602         order_actions(fence, action, pe_order_implies_then);
3603     }
3604 }
3605 
3606 static bool
3607 should_ignore_failure_timeout(pe_resource_t *rsc, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
3608                               const char *task, guint interval_ms,
3609                               bool is_last_failure, pe_working_set_t *data_set)
3610 {
3611     /* Clearing failures of recurring monitors has special concerns. The
3612      * executor reports only changes in the monitor result, so if the
3613      * monitor is still active and still getting the same failure result,
3614      * that will go undetected after the failure is cleared.
3615      *
3616      * Also, the operation history will have the time when the recurring
3617      * monitor result changed to the given code, not the time when the
3618      * result last happened.
3619      *
3620      * @TODO We probably should clear such failures only when the failure
3621      * timeout has passed since the last occurrence of the failed result.
3622      * However we don't record that information. We could maybe approximate
3623      * that by clearing only if there is a more recent successful monitor or
3624      * stop result, but we don't even have that information at this point
3625      * since we are still unpacking the resource's operation history.
3626      *
3627      * This is especially important for remote connection resources with a
3628      * reconnect interval, so in that case, we skip clearing failures
3629      * if the remote node hasn't been fenced.
3630      */
3631     if (rsc->remote_reconnect_ms
3632         && pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
3633         && (interval_ms != 0) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3634 
3635         pe_node_t *remote_node = pe_find_node(data_set->nodes, rsc->id);
3636 
3637         if (remote_node && !remote_node->details->remote_was_fenced) {
3638             if (is_last_failure) {
3639                 crm_info("Waiting to clear monitor failure for remote node %s"
3640                          " until fencing has occurred", rsc->id);
3641             }
3642             return TRUE;
3643         }
3644     }
3645     return FALSE;
3646 }
3647 
3648 /*!
3649  * \internal
3650  * \brief Check operation age and schedule failure clearing when appropriate
3651  *
3652  * This function has two distinct purposes. The first is to check whether an
3653  * operation history entry is expired (i.e. the resource has a failure timeout,
3654  * the entry is older than the timeout, and the resource either has no fail
3655  * count or its fail count is entirely older than the timeout). The second is to
3656  * schedule fail count clearing when appropriate (i.e. the operation is expired
3657  * and either the resource has an expired fail count or the operation is a
3658  * last_failure for a remote connection resource with a reconnect interval,
3659  * or the operation is a last_failure for a start or monitor operation and the
3660  * resource's parameters have changed since the operation).
3661  *
3662  * \param[in] rsc       Resource that operation happened to
3663  * \param[in] node      Node that operation happened on
3664  * \param[in] rc        Actual result of operation
3665  * \param[in] xml_op    Operation history entry XML
3666  * \param[in] data_set  Current working set
3667  *
3668  * \return TRUE if operation history entry is expired, FALSE otherwise
3669  */
3670 static bool
3671 check_operation_expiry(pe_resource_t *rsc, pe_node_t *node, int rc,
     /* [previous][next][first][last][top][bottom][index][help] */
3672                        xmlNode *xml_op, pe_working_set_t *data_set)
3673 {
3674     bool expired = FALSE;
3675     bool is_last_failure = pcmk__ends_with(ID(xml_op), "_last_failure_0");
3676     time_t last_run = 0;
3677     guint interval_ms = 0;
3678     int unexpired_fail_count = 0;
3679     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3680     const char *clear_reason = NULL;
3681 
3682     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3683 
3684     if ((rsc->failure_timeout > 0)
3685         && (crm_element_value_epoch(xml_op, XML_RSC_OP_LAST_CHANGE,
3686                                     &last_run) == 0)) {
3687 
3688         // Resource has a failure-timeout, and history entry has a timestamp
3689 
3690         time_t now = get_effective_time(data_set);
3691         time_t last_failure = 0;
3692 
3693         // Is this particular operation history older than the failure timeout?
3694         if ((now >= (last_run + rsc->failure_timeout))
3695             && !should_ignore_failure_timeout(rsc, xml_op, task, interval_ms,
3696                                               is_last_failure, data_set)) {
3697             expired = TRUE;
3698         }
3699 
3700         // Does the resource as a whole have an unexpired fail count?
3701         unexpired_fail_count = pe_get_failcount(node, rsc, &last_failure,
3702                                                 pe_fc_effective, xml_op,
3703                                                 data_set);
3704 
3705         // Update scheduler recheck time according to *last* failure
3706         crm_trace("%s@%lld is %sexpired @%lld with unexpired_failures=%d timeout=%ds"
3707                   " last-failure@%lld",
3708                   ID(xml_op), (long long) last_run, (expired? "" : "not "),
3709                   (long long) now, unexpired_fail_count, rsc->failure_timeout,
3710                   (long long) last_failure);
3711         last_failure += rsc->failure_timeout + 1;
3712         if (unexpired_fail_count && (now < last_failure)) {
3713             pe__update_recheck_time(last_failure, data_set);
3714         }
3715     }
3716 
3717     if (expired) {
3718         if (pe_get_failcount(node, rsc, NULL, pe_fc_default, xml_op, data_set)) {
3719 
3720             // There is a fail count ignoring timeout
3721 
3722             if (unexpired_fail_count == 0) {
3723                 // There is no fail count considering timeout
3724                 clear_reason = "it expired";
3725 
3726             } else {
3727                 /* This operation is old, but there is an unexpired fail count.
3728                  * In a properly functioning cluster, this should only be
3729                  * possible if this operation is not a failure (otherwise the
3730                  * fail count should be expired too), so this is really just a
3731                  * failsafe.
3732                  */
3733                 expired = FALSE;
3734             }
3735 
3736         } else if (is_last_failure && rsc->remote_reconnect_ms) {
3737             /* Clear any expired last failure when reconnect interval is set,
3738              * even if there is no fail count.
3739              */
3740             clear_reason = "reconnect interval is set";
3741         }
3742     }
3743 
3744     if (!expired && is_last_failure
3745         && should_clear_for_param_change(xml_op, task, rsc, node, data_set)) {
3746         clear_reason = "resource parameters have changed";
3747     }
3748 
3749     if (clear_reason != NULL) {
3750         // Schedule clearing of the fail count
3751         pe_action_t *clear_op = pe__clear_failcount(rsc, node, clear_reason,
3752                                                     data_set);
3753 
3754         if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
3755             && rsc->remote_reconnect_ms) {
3756             /* If we're clearing a remote connection due to a reconnect
3757              * interval, we want to wait until any scheduled fencing
3758              * completes.
3759              *
3760              * We could limit this to remote_node->details->unclean, but at
3761              * this point, that's always true (it won't be reliable until
3762              * after unpack_node_history() is done).
3763              */
3764             crm_info("Clearing %s failure will wait until any scheduled "
3765                      "fencing of %s completes", task, rsc->id);
3766             order_after_remote_fencing(clear_op, rsc, data_set);
3767         }
3768     }
3769 
3770     if (expired && (interval_ms == 0) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3771         switch(rc) {
3772             case PCMK_OCF_OK:
3773             case PCMK_OCF_NOT_RUNNING:
3774             case PCMK_OCF_RUNNING_PROMOTED:
3775             case PCMK_OCF_DEGRADED:
3776             case PCMK_OCF_DEGRADED_PROMOTED:
3777                 // Don't expire probes that return these values
3778                 expired = FALSE;
3779                 break;
3780         }
3781     }
3782 
3783     return expired;
3784 }
3785 
3786 int pe__target_rc_from_xml(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
3787 {
3788     int target_rc = 0;
3789     const char *key = crm_element_value(xml_op, XML_ATTR_TRANSITION_KEY);
3790 
3791     if (key == NULL) {
3792         return -1;
3793     }
3794     decode_transition_key(key, NULL, NULL, NULL, &target_rc);
3795     return target_rc;
3796 }
3797 
3798 static enum action_fail_response
3799 get_action_on_fail(pe_resource_t *rsc, const char *key, const char *task, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
3800 {
3801     enum action_fail_response result = action_fail_recover;
3802     pe_action_t *action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
3803 
3804     result = action->on_fail;
3805     pe_free_action(action);
3806 
3807     return result;
3808 }
3809 
3810 static void
3811 update_resource_state(pe_resource_t * rsc, pe_node_t * node, xmlNode * xml_op, const char * task, int rc,
     /* [previous][next][first][last][top][bottom][index][help] */
3812                       xmlNode * last_failure, enum action_fail_response * on_fail, pe_working_set_t * data_set)
3813 {
3814     gboolean clear_past_failure = FALSE;
3815 
3816     CRM_ASSERT(rsc);
3817     CRM_ASSERT(xml_op);
3818 
3819     if (rc == PCMK_OCF_NOT_INSTALLED || (!pe_rsc_is_bundled(rsc) && pcmk_xe_mask_probe_failure(xml_op))) {
3820         rsc->role = RSC_ROLE_STOPPED;
3821 
3822     } else if (rc == PCMK_OCF_NOT_RUNNING) {
3823         clear_past_failure = TRUE;
3824 
3825     } else if (pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3826         if (last_failure) {
3827             const char *op_key = get_op_key(xml_op);
3828             const char *last_failure_key = get_op_key(last_failure);
3829 
3830             if (pcmk__str_eq(op_key, last_failure_key, pcmk__str_casei)) {
3831                 clear_past_failure = TRUE;
3832             }
3833         }
3834 
3835         if (rsc->role < RSC_ROLE_STARTED) {
3836             set_active(rsc);
3837         }
3838 
3839     } else if (pcmk__str_eq(task, CRMD_ACTION_START, pcmk__str_casei)) {
3840         rsc->role = RSC_ROLE_STARTED;
3841         clear_past_failure = TRUE;
3842 
3843     } else if (pcmk__str_eq(task, CRMD_ACTION_STOP, pcmk__str_casei)) {
3844         rsc->role = RSC_ROLE_STOPPED;
3845         clear_past_failure = TRUE;
3846 
3847     } else if (pcmk__str_eq(task, CRMD_ACTION_PROMOTE, pcmk__str_casei)) {
3848         rsc->role = RSC_ROLE_PROMOTED;
3849         clear_past_failure = TRUE;
3850 
3851     } else if (pcmk__str_eq(task, CRMD_ACTION_DEMOTE, pcmk__str_casei)) {
3852 
3853         if (*on_fail == action_fail_demote) {
3854             // Demote clears an error only if on-fail=demote
3855             clear_past_failure = TRUE;
3856         }
3857         rsc->role = RSC_ROLE_UNPROMOTED;
3858 
3859     } else if (pcmk__str_eq(task, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
3860         rsc->role = RSC_ROLE_STARTED;
3861         clear_past_failure = TRUE;
3862 
3863     } else if (pcmk__str_eq(task, CRMD_ACTION_MIGRATE, pcmk__str_casei)) {
3864         unpack_migrate_to_success(rsc, node, xml_op, data_set);
3865 
3866     } else if (rsc->role < RSC_ROLE_STARTED) {
3867         pe_rsc_trace(rsc, "%s active on %s", rsc->id, pe__node_name(node));
3868         set_active(rsc);
3869     }
3870 
3871     /* clear any previous failure actions */
3872     if (clear_past_failure) {
3873         switch (*on_fail) {
3874             case action_fail_stop:
3875             case action_fail_fence:
3876             case action_fail_migrate:
3877             case action_fail_standby:
3878                 pe_rsc_trace(rsc, "%s.%s is not cleared by a completed stop",
3879                              rsc->id, fail2text(*on_fail));
3880                 break;
3881 
3882             case action_fail_block:
3883             case action_fail_ignore:
3884             case action_fail_demote:
3885             case action_fail_recover:
3886             case action_fail_restart_container:
3887                 *on_fail = action_fail_ignore;
3888                 pe__set_next_role(rsc, RSC_ROLE_UNKNOWN, "clear past failures");
3889                 break;
3890             case action_fail_reset_remote:
3891                 if (rsc->remote_reconnect_ms == 0) {
3892                     /* With no reconnect interval, the connection is allowed to
3893                      * start again after the remote node is fenced and
3894                      * completely stopped. (With a reconnect interval, we wait
3895                      * for the failure to be cleared entirely before attempting
3896                      * to reconnect.)
3897                      */
3898                     *on_fail = action_fail_ignore;
3899                     pe__set_next_role(rsc, RSC_ROLE_UNKNOWN,
3900                                       "clear past failures and reset remote");
3901                 }
3902                 break;
3903         }
3904     }
3905 }
3906 
3907 static void
3908 unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
3909               xmlNode **last_failure, enum action_fail_response *on_fail,
3910               pe_working_set_t *data_set)
3911 {
3912     int rc = 0;
3913     int old_rc = 0;
3914     int task_id = 0;
3915     int target_rc = 0;
3916     int old_target_rc = 0;
3917     int status = PCMK_EXEC_UNKNOWN;
3918     guint interval_ms = 0;
3919     const char *task = NULL;
3920     const char *task_key = NULL;
3921     const char *exit_reason = NULL;
3922     bool expired = false;
3923     pe_resource_t *parent = rsc;
3924     enum action_fail_response failure_strategy = action_fail_recover;
3925     bool maskable_probe_failure = false;
3926 
3927     CRM_CHECK(rsc && node && xml_op, return);
3928 
3929     target_rc = pe__target_rc_from_xml(xml_op);
3930     task_key = get_op_key(xml_op);
3931     task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3932     exit_reason = crm_element_value(xml_op, XML_LRM_ATTR_EXIT_REASON);
3933     if (exit_reason == NULL) {
3934         exit_reason = "";
3935     }
3936 
3937     crm_element_value_int(xml_op, XML_LRM_ATTR_RC, &rc);
3938     crm_element_value_int(xml_op, XML_LRM_ATTR_CALLID, &task_id);
3939     crm_element_value_int(xml_op, XML_LRM_ATTR_OPSTATUS, &status);
3940     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3941 
3942     CRM_CHECK(task != NULL, return);
3943     CRM_CHECK((status >= PCMK_EXEC_PENDING) && (status <= PCMK_EXEC_MAX),
3944               return);
3945 
3946     if (!strcmp(task, CRMD_ACTION_NOTIFY) ||
3947         !strcmp(task, CRMD_ACTION_METADATA)) {
3948         /* safe to ignore these */
3949         return;
3950     }
3951 
3952     if (!pcmk_is_set(rsc->flags, pe_rsc_unique)) {
3953         parent = uber_parent(rsc);
3954     }
3955 
3956     pe_rsc_trace(rsc, "Unpacking task %s/%s (call_id=%d, status=%d, rc=%d) on %s (role=%s)",
3957                  task_key, task, task_id, status, rc, pe__node_name(node),
3958                  role2text(rsc->role));
3959 
3960     if (node->details->unclean) {
3961         pe_rsc_trace(rsc,
3962                      "%s is running on %s, which is unclean (further action "
3963                      "depends on value of stop's on-fail attribute)",
3964                      rsc->id, pe__node_name(node));
3965     }
3966 
3967     /* It should be possible to call remap_operation() first then call
3968      * check_operation_expiry() only if rc != target_rc, because there should
3969      * never be a fail count without at least one unexpected result in the
3970      * resource history. That would be more efficient by avoiding having to call
3971      * check_operation_expiry() for expected results.
3972      *
3973      * However, we do have such configurations in the scheduler regression
3974      * tests, even if it shouldn't be possible with the current code. It's
3975      * probably a good idea anyway, but that would require updating the test
3976      * inputs to something currently possible.
3977      */
3978 
3979     if ((status != PCMK_EXEC_NOT_INSTALLED)
3980         && check_operation_expiry(rsc, node, rc, xml_op, data_set)) {
3981         expired = true;
3982     }
3983 
3984     old_rc = rc;
3985     old_target_rc = target_rc;
3986 
3987     remap_operation(xml_op, rsc, node, data_set, on_fail, target_rc,
3988                     &rc, &status);
3989 
3990     maskable_probe_failure = !pe_rsc_is_bundled(rsc) && pcmk_xe_mask_probe_failure(xml_op);
3991 
3992     if (expired && maskable_probe_failure && old_rc != old_target_rc) {
3993         if (rsc->role <= RSC_ROLE_STOPPED) {
3994             rsc->role = RSC_ROLE_UNKNOWN;
3995         }
3996 
3997         goto done;
3998 
3999     } else if (expired && (rc != target_rc)) {
4000         const char *magic = crm_element_value(xml_op, XML_ATTR_TRANSITION_MAGIC);
4001 
4002         if (interval_ms == 0) {
4003             crm_notice("Ignoring expired %s failure on %s "
4004                        CRM_XS " actual=%d expected=%d magic=%s",
4005                        task_key, pe__node_name(node), rc, target_rc, magic);
4006             goto done;
4007 
4008         } else if(node->details->online && node->details->unclean == FALSE) {
4009             /* Reschedule the recurring monitor. schedule_cancel() won't work at
4010              * this stage, so as a hacky workaround, forcibly change the restart
4011              * digest so pcmk__check_action_config() does what we want later.
4012              *
4013              * @TODO We should skip this if there is a newer successful monitor.
4014              *       Also, this causes rescheduling only if the history entry
4015              *       has an op-digest (which the expire-non-blocked-failure
4016              *       scheduler regression test doesn't, but that may not be a
4017              *       realistic scenario in production).
4018              */
4019             crm_notice("Rescheduling %s after failure expired on %s "
4020                        CRM_XS " actual=%d expected=%d magic=%s",
4021                        task_key, pe__node_name(node), rc, target_rc, magic);
4022             crm_xml_add(xml_op, XML_LRM_ATTR_RESTART_DIGEST, "calculated-failure-timeout");
4023             goto done;
4024         }
4025     }
4026 
4027     if (maskable_probe_failure) {
4028         crm_notice("Treating probe result '%s' for %s on %s as 'not running'",
4029                    services_ocf_exitcode_str(old_rc), rsc->id,
4030                    pe__node_name(node));
4031         update_resource_state(rsc, node, xml_op, task, target_rc, *last_failure,
4032                               on_fail, data_set);
4033         crm_xml_add(xml_op, XML_ATTR_UNAME, node->details->uname);
4034 
4035         record_failed_op(xml_op, node, rsc, data_set);
4036         resource_location(parent, node, -INFINITY, "masked-probe-failure", data_set);
4037         goto done;
4038     }
4039 
4040     switch (status) {
4041         case PCMK_EXEC_CANCELLED:
4042             // Should never happen
4043             pe_err("Resource history contains cancellation '%s' "
4044                    "(%s of %s on %s at %s)",
4045                    ID(xml_op), task, rsc->id, pe__node_name(node),
4046                    last_change_str(xml_op));
4047             goto done;
4048 
4049         case PCMK_EXEC_PENDING:
4050             if (!strcmp(task, CRMD_ACTION_START)) {
4051                 pe__set_resource_flags(rsc, pe_rsc_start_pending);
4052                 set_active(rsc);
4053 
4054             } else if (!strcmp(task, CRMD_ACTION_PROMOTE)) {
4055                 rsc->role = RSC_ROLE_PROMOTED;
4056 
4057             } else if (!strcmp(task, CRMD_ACTION_MIGRATE) && node->details->unclean) {
4058                 /* If a pending migrate_to action is out on a unclean node,
4059                  * we have to force the stop action on the target. */
4060                 const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
4061                 pe_node_t *target = pe_find_node(data_set->nodes, migrate_target);
4062                 if (target) {
4063                     stop_action(rsc, target, FALSE);
4064                 }
4065             }
4066 
4067             if (rsc->pending_task == NULL) {
4068                 if ((interval_ms != 0) || strcmp(task, CRMD_ACTION_STATUS)) {
4069                     rsc->pending_task = strdup(task);
4070                     rsc->pending_node = node;
4071                 } else {
4072                     /* Pending probes are not printed, even if pending
4073                      * operations are requested. If someone ever requests that
4074                      * behavior, enable the below and the corresponding part of
4075                      * native.c:native_pending_task().
4076                      */
4077 #if 0
4078                     rsc->pending_task = strdup("probe");
4079                     rsc->pending_node = node;
4080 #endif
4081                 }
4082             }
4083             goto done;
4084 
4085         case PCMK_EXEC_DONE:
4086             pe_rsc_trace(rsc, "%s of %s on %s completed at %s " CRM_XS " id=%s",
4087                          task, rsc->id, pe__node_name(node),
4088                          last_change_str(xml_op), ID(xml_op));
4089             update_resource_state(rsc, node, xml_op, task, rc, *last_failure, on_fail, data_set);
4090             goto done;
4091 
4092         case PCMK_EXEC_NOT_INSTALLED:
4093             failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
4094             if (failure_strategy == action_fail_ignore) {
4095                 crm_warn("Cannot ignore failed %s of %s on %s: "
4096                          "Resource agent doesn't exist "
4097                          CRM_XS " status=%d rc=%d id=%s",
4098                          task, rsc->id, pe__node_name(node), status, rc,
4099                          ID(xml_op));
4100                 /* Also for printing it as "FAILED" by marking it as pe_rsc_failed later */
4101                 *on_fail = action_fail_migrate;
4102             }
4103             resource_location(parent, node, -INFINITY, "hard-error", data_set);
4104             unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail, data_set);
4105             goto done;
4106 
4107         case PCMK_EXEC_NOT_CONNECTED:
4108             if (pe__is_guest_or_remote_node(node)
4109                 && pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_managed)) {
4110                 /* We should never get into a situation where a managed remote
4111                  * connection resource is considered OK but a resource action
4112                  * behind the connection gets a "not connected" status. But as a
4113                  * fail-safe in case a bug or unusual circumstances do lead to
4114                  * that, ensure the remote connection is considered failed.
4115                  */
4116                 pe__set_resource_flags(node->details->remote_rsc,
4117                                        pe_rsc_failed|pe_rsc_stop);
4118             }
4119             break; // Not done, do error handling
4120 
4121         case PCMK_EXEC_ERROR:
4122         case PCMK_EXEC_ERROR_HARD:
4123         case PCMK_EXEC_ERROR_FATAL:
4124         case PCMK_EXEC_TIMEOUT:
4125         case PCMK_EXEC_NOT_SUPPORTED:
4126         case PCMK_EXEC_INVALID:
4127             break; // Not done, do error handling
4128 
4129         case PCMK_EXEC_NO_FENCE_DEVICE:
4130         case PCMK_EXEC_NO_SECRETS:
4131             status = PCMK_EXEC_ERROR_HARD;
4132             break; // Not done, do error handling
4133     }
4134 
4135     failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
4136     if ((failure_strategy == action_fail_ignore)
4137         || (failure_strategy == action_fail_restart_container
4138             && !strcmp(task, CRMD_ACTION_STOP))) {
4139 
4140         crm_warn("Pretending failed %s (%s%s%s) of %s on %s at %s "
4141                  "succeeded " CRM_XS " rc=%d id=%s",
4142                  task, services_ocf_exitcode_str(rc),
4143                  (*exit_reason? ": " : ""), exit_reason, rsc->id,
4144                  pe__node_name(node), last_change_str(xml_op), rc,
4145                  ID(xml_op));
4146 
4147         update_resource_state(rsc, node, xml_op, task, target_rc, *last_failure,
4148                               on_fail, data_set);
4149         crm_xml_add(xml_op, XML_ATTR_UNAME, node->details->uname);
4150         pe__set_resource_flags(rsc, pe_rsc_failure_ignored);
4151 
4152         record_failed_op(xml_op, node, rsc, data_set);
4153 
4154         if ((failure_strategy == action_fail_restart_container)
4155             && cmp_on_fail(*on_fail, action_fail_recover) <= 0) {
4156             *on_fail = failure_strategy;
4157         }
4158 
4159     } else {
4160         unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail,
4161                               data_set);
4162 
4163         if (status == PCMK_EXEC_ERROR_HARD) {
4164             do_crm_log(rc != PCMK_OCF_NOT_INSTALLED?LOG_ERR:LOG_NOTICE,
4165                        "Preventing %s from restarting on %s because "
4166                        "of hard failure (%s%s%s)" CRM_XS " rc=%d id=%s",
4167                        parent->id, pe__node_name(node),
4168                        services_ocf_exitcode_str(rc),
4169                        (*exit_reason? ": " : ""), exit_reason,
4170                        rc, ID(xml_op));
4171             resource_location(parent, node, -INFINITY, "hard-error", data_set);
4172 
4173         } else if (status == PCMK_EXEC_ERROR_FATAL) {
4174             crm_err("Preventing %s from restarting anywhere because "
4175                     "of fatal failure (%s%s%s) " CRM_XS " rc=%d id=%s",
4176                     parent->id, services_ocf_exitcode_str(rc),
4177                     (*exit_reason? ": " : ""), exit_reason,
4178                     rc, ID(xml_op));
4179             resource_location(parent, NULL, -INFINITY, "fatal-error", data_set);
4180         }
4181     }
4182 
4183 done:
4184     pe_rsc_trace(rsc, "Resource %s after %s: role=%s, next=%s",
4185                  rsc->id, task, role2text(rsc->role),
4186                  role2text(rsc->next_role));
4187 }
4188 
4189 static void
4190 add_node_attrs(xmlNode *xml_obj, pe_node_t *node, bool overwrite,
     /* [previous][next][first][last][top][bottom][index][help] */
4191                pe_working_set_t *data_set)
4192 {
4193     const char *cluster_name = NULL;
4194 
4195     pe_rule_eval_data_t rule_data = {
4196         .node_hash = NULL,
4197         .role = RSC_ROLE_UNKNOWN,
4198         .now = data_set->now,
4199         .match_data = NULL,
4200         .rsc_data = NULL,
4201         .op_data = NULL
4202     };
4203 
4204     g_hash_table_insert(node->details->attrs,
4205                         strdup(CRM_ATTR_UNAME), strdup(node->details->uname));
4206 
4207     g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_ID),
4208                         strdup(node->details->id));
4209     if (pcmk__str_eq(node->details->id, data_set->dc_uuid, pcmk__str_casei)) {
4210         data_set->dc_node = node;
4211         node->details->is_dc = TRUE;
4212         g_hash_table_insert(node->details->attrs,
4213                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_TRUE));
4214     } else {
4215         g_hash_table_insert(node->details->attrs,
4216                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_FALSE));
4217     }
4218 
4219     cluster_name = g_hash_table_lookup(data_set->config_hash, "cluster-name");
4220     if (cluster_name) {
4221         g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_CLUSTER_NAME),
4222                             strdup(cluster_name));
4223     }
4224 
4225     pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_ATTR_SETS, &rule_data,
4226                                node->details->attrs, NULL, overwrite, data_set);
4227 
4228     if (pe_node_attribute_raw(node, CRM_ATTR_SITE_NAME) == NULL) {
4229         const char *site_name = pe_node_attribute_raw(node, "site-name");
4230 
4231         if (site_name) {
4232             g_hash_table_insert(node->details->attrs,
4233                                 strdup(CRM_ATTR_SITE_NAME),
4234                                 strdup(site_name));
4235 
4236         } else if (cluster_name) {
4237             /* Default to cluster-name if unset */
4238             g_hash_table_insert(node->details->attrs,
4239                                 strdup(CRM_ATTR_SITE_NAME),
4240                                 strdup(cluster_name));
4241         }
4242     }
4243 }
4244 
4245 static GList *
4246 extract_operations(const char *node, const char *rsc, xmlNode * rsc_entry, gboolean active_filter)
     /* [previous][next][first][last][top][bottom][index][help] */
4247 {
4248     int counter = -1;
4249     int stop_index = -1;
4250     int start_index = -1;
4251 
4252     xmlNode *rsc_op = NULL;
4253 
4254     GList *gIter = NULL;
4255     GList *op_list = NULL;
4256     GList *sorted_op_list = NULL;
4257 
4258     /* extract operations */
4259     op_list = NULL;
4260     sorted_op_list = NULL;
4261 
4262     for (rsc_op = pcmk__xe_first_child(rsc_entry);
4263          rsc_op != NULL; rsc_op = pcmk__xe_next(rsc_op)) {
4264 
4265         if (pcmk__str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP,
4266                          pcmk__str_none)) {
4267             crm_xml_add(rsc_op, "resource", rsc);
4268             crm_xml_add(rsc_op, XML_ATTR_UNAME, node);
4269             op_list = g_list_prepend(op_list, rsc_op);
4270         }
4271     }
4272 
4273     if (op_list == NULL) {
4274         /* if there are no operations, there is nothing to do */
4275         return NULL;
4276     }
4277 
4278     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
4279 
4280     /* create active recurring operations as optional */
4281     if (active_filter == FALSE) {
4282         return sorted_op_list;
4283     }
4284 
4285     op_list = NULL;
4286 
4287     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
4288 
4289     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
4290         xmlNode *rsc_op = (xmlNode *) gIter->data;
4291 
4292         counter++;
4293 
4294         if (start_index < stop_index) {
4295             crm_trace("Skipping %s: not active", ID(rsc_entry));
4296             break;
4297 
4298         } else if (counter < start_index) {
4299             crm_trace("Skipping %s: old", ID(rsc_op));
4300             continue;
4301         }
4302         op_list = g_list_append(op_list, rsc_op);
4303     }
4304 
4305     g_list_free(sorted_op_list);
4306     return op_list;
4307 }
4308 
4309 GList *
4310 find_operations(const char *rsc, const char *node, gboolean active_filter,
     /* [previous][next][first][last][top][bottom][index][help] */
4311                 pe_working_set_t * data_set)
4312 {
4313     GList *output = NULL;
4314     GList *intermediate = NULL;
4315 
4316     xmlNode *tmp = NULL;
4317     xmlNode *status = find_xml_node(data_set->input, XML_CIB_TAG_STATUS, TRUE);
4318 
4319     pe_node_t *this_node = NULL;
4320 
4321     xmlNode *node_state = NULL;
4322 
4323     for (node_state = pcmk__xe_first_child(status); node_state != NULL;
4324          node_state = pcmk__xe_next(node_state)) {
4325 
4326         if (pcmk__str_eq((const char *)node_state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
4327             const char *uname = crm_element_value(node_state, XML_ATTR_UNAME);
4328 
4329             if (node != NULL && !pcmk__str_eq(uname, node, pcmk__str_casei)) {
4330                 continue;
4331             }
4332 
4333             this_node = pe_find_node(data_set->nodes, uname);
4334             if(this_node == NULL) {
4335                 CRM_LOG_ASSERT(this_node != NULL);
4336                 continue;
4337 
4338             } else if (pe__is_guest_or_remote_node(this_node)) {
4339                 determine_remote_online_status(data_set, this_node);
4340 
4341             } else {
4342                 determine_online_status(node_state, this_node, data_set);
4343             }
4344 
4345             if (this_node->details->online
4346                 || pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
4347                 /* offline nodes run no resources...
4348                  * unless stonith is enabled in which case we need to
4349                  *   make sure rsc start events happen after the stonith
4350                  */
4351                 xmlNode *lrm_rsc = NULL;
4352 
4353                 tmp = find_xml_node(node_state, XML_CIB_TAG_LRM, FALSE);
4354                 tmp = find_xml_node(tmp, XML_LRM_TAG_RESOURCES, FALSE);
4355 
4356                 for (lrm_rsc = pcmk__xe_first_child(tmp); lrm_rsc != NULL;
4357                      lrm_rsc = pcmk__xe_next(lrm_rsc)) {
4358 
4359                     if (pcmk__str_eq((const char *)lrm_rsc->name,
4360                                      XML_LRM_TAG_RESOURCE, pcmk__str_none)) {
4361 
4362                         const char *rsc_id = crm_element_value(lrm_rsc, XML_ATTR_ID);
4363 
4364                         if (rsc != NULL && !pcmk__str_eq(rsc_id, rsc, pcmk__str_casei)) {
4365                             continue;
4366                         }
4367 
4368                         intermediate = extract_operations(uname, rsc_id, lrm_rsc, active_filter);
4369                         output = g_list_concat(output, intermediate);
4370                     }
4371                 }
4372             }
4373         }
4374     }
4375 
4376     return output;
4377 }

/* [previous][next][first][last][top][bottom][index][help] */