root/lib/pengine/unpack.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. is_dangling_guest_node
  2. pe_fence_node
  3. set_if_xpath
  4. unpack_config
  5. pe_create_node
  6. expand_remote_rsc_meta
  7. handle_startup_fencing
  8. unpack_nodes
  9. setup_container
  10. unpack_remote_nodes
  11. link_rsc2remotenode
  12. destroy_tag
  13. unpack_resources
  14. unpack_tags
  15. unpack_ticket_state
  16. unpack_tickets_state
  17. unpack_handle_remote_attrs
  18. unpack_transient_attributes
  19. unpack_node_state
  20. unpack_node_history
  21. unpack_status
  22. determine_online_status_no_fencing
  23. determine_online_status_fencing
  24. determine_remote_online_status
  25. determine_online_status
  26. pe_base_name_end
  27. clone_strip
  28. clone_zero
  29. create_fake_resource
  30. create_anonymous_orphan
  31. find_anonymous_clone
  32. unpack_find_resource
  33. process_orphan_resource
  34. process_rsc_state
  35. process_recurring
  36. calculate_active_ops
  37. unpack_shutdown_lock
  38. unpack_lrm_resource
  39. handle_orphaned_container_fillers
  40. unpack_node_lrm
  41. set_active
  42. set_node_score
  43. find_lrm_op
  44. pe__call_id
  45. stop_happened_after
  46. unpack_migrate_to_success
  47. newer_op
  48. unpack_migrate_to_failure
  49. unpack_migrate_from_failure
  50. record_failed_op
  51. get_op_key
  52. last_change_str
  53. cmp_on_fail
  54. unpack_rsc_op_failure
  55. remap_operation
  56. should_clear_for_param_change
  57. order_after_remote_fencing
  58. should_ignore_failure_timeout
  59. check_operation_expiry
  60. pe__target_rc_from_xml
  61. get_action_on_fail
  62. update_resource_state
  63. unpack_rsc_op
  64. add_node_attrs
  65. extract_operations
  66. find_operations

   1 /*
   2  * Copyright 2004-2022 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <stdio.h>
  13 #include <string.h>
  14 #include <glib.h>
  15 #include <time.h>
  16 
  17 #include <crm/crm.h>
  18 #include <crm/services.h>
  19 #include <crm/msg_xml.h>
  20 #include <crm/common/xml.h>
  21 #include <crm/common/xml_internal.h>
  22 
  23 #include <crm/common/util.h>
  24 #include <crm/pengine/rules.h>
  25 #include <crm/pengine/internal.h>
  26 #include <pe_status_private.h>
  27 
  28 CRM_TRACE_INIT_DATA(pe_status);
  29 
  30 /* This uses pcmk__set_flags_as()/pcmk__clear_flags_as() directly rather than
  31  * use pe__set_working_set_flags()/pe__clear_working_set_flags() so that the
  32  * flag is stringified more readably in log messages.
  33  */
  34 #define set_config_flag(data_set, option, flag) do {                        \
  35         const char *scf_value = pe_pref((data_set)->config_hash, (option)); \
  36         if (scf_value != NULL) {                                            \
  37             if (crm_is_true(scf_value)) {                                   \
  38                 (data_set)->flags = pcmk__set_flags_as(__func__, __LINE__,  \
  39                                     LOG_TRACE, "Working set",               \
  40                                     crm_system_name, (data_set)->flags,     \
  41                                     (flag), #flag);                         \
  42             } else {                                                        \
  43                 (data_set)->flags = pcmk__clear_flags_as(__func__, __LINE__,\
  44                                     LOG_TRACE, "Working set",               \
  45                                     crm_system_name, (data_set)->flags,     \
  46                                     (flag), #flag);                         \
  47             }                                                               \
  48         }                                                                   \
  49     } while(0)
  50 
  51 static void unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
  52                           xmlNode **last_failure,
  53                           enum action_fail_response *failed,
  54                           pe_working_set_t *data_set);
  55 static void determine_remote_online_status(pe_working_set_t *data_set,
  56                                            pe_node_t *this_node);
  57 static void add_node_attrs(xmlNode *attrs, pe_node_t *node, bool overwrite,
  58                            pe_working_set_t *data_set);
  59 static void determine_online_status(xmlNode *node_state, pe_node_t *this_node,
  60                                     pe_working_set_t *data_set);
  61 
  62 static void unpack_node_lrm(pe_node_t *node, xmlNode *xml,
  63                             pe_working_set_t *data_set);
  64 
  65 
  66 // Bitmask for warnings we only want to print once
  67 uint32_t pe_wo = 0;
  68 
  69 static gboolean
  70 is_dangling_guest_node(pe_node_t *node)
     /* [previous][next][first][last][top][bottom][index][help] */
  71 {
  72     /* we are looking for a remote-node that was supposed to be mapped to a
  73      * container resource, but all traces of that container have disappeared 
  74      * from both the config and the status section. */
  75     if (pe__is_guest_or_remote_node(node) &&
  76         node->details->remote_rsc &&
  77         node->details->remote_rsc->container == NULL &&
  78         pcmk_is_set(node->details->remote_rsc->flags,
  79                     pe_rsc_orphan_container_filler)) {
  80         return TRUE;
  81     }
  82 
  83     return FALSE;
  84 }
  85 
  86 /*!
  87  * \brief Schedule a fence action for a node
  88  *
  89  * \param[in,out] data_set  Current working set of cluster
  90  * \param[in,out] node      Node to fence
  91  * \param[in]     reason    Text description of why fencing is needed
  92  * \param[in]     priority_delay  Whether to consider `priority-fencing-delay`
  93  */
  94 void
  95 pe_fence_node(pe_working_set_t * data_set, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
  96               const char *reason, bool priority_delay)
  97 {
  98     CRM_CHECK(node, return);
  99 
 100     /* A guest node is fenced by marking its container as failed */
 101     if (pe__is_guest_node(node)) {
 102         pe_resource_t *rsc = node->details->remote_rsc->container;
 103 
 104         if (!pcmk_is_set(rsc->flags, pe_rsc_failed)) {
 105             if (!pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 106                 crm_notice("Not fencing guest node %s "
 107                            "(otherwise would because %s): "
 108                            "its guest resource %s is unmanaged",
 109                            node->details->uname, reason, rsc->id);
 110             } else {
 111                 crm_warn("Guest node %s will be fenced "
 112                          "(by recovering its guest resource %s): %s",
 113                          node->details->uname, rsc->id, reason);
 114 
 115                 /* We don't mark the node as unclean because that would prevent the
 116                  * node from running resources. We want to allow it to run resources
 117                  * in this transition if the recovery succeeds.
 118                  */
 119                 node->details->remote_requires_reset = TRUE;
 120                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
 121             }
 122         }
 123 
 124     } else if (is_dangling_guest_node(node)) {
 125         crm_info("Cleaning up dangling connection for guest node %s: "
 126                  "fencing was already done because %s, "
 127                  "and guest resource no longer exists",
 128                  node->details->uname, reason);
 129         pe__set_resource_flags(node->details->remote_rsc,
 130                                pe_rsc_failed|pe_rsc_stop);
 131 
 132     } else if (pe__is_remote_node(node)) {
 133         pe_resource_t *rsc = node->details->remote_rsc;
 134 
 135         if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 136             crm_notice("Not fencing remote node %s "
 137                        "(otherwise would because %s): connection is unmanaged",
 138                        node->details->uname, reason);
 139         } else if(node->details->remote_requires_reset == FALSE) {
 140             node->details->remote_requires_reset = TRUE;
 141             crm_warn("Remote node %s %s: %s",
 142                      node->details->uname,
 143                      pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 144                      reason);
 145         }
 146         node->details->unclean = TRUE;
 147         // No need to apply `priority-fencing-delay` for remote nodes
 148         pe_fence_op(node, NULL, TRUE, reason, FALSE, data_set);
 149 
 150     } else if (node->details->unclean) {
 151         crm_trace("Cluster node %s %s because %s",
 152                   node->details->uname,
 153                   pe_can_fence(data_set, node)? "would also be fenced" : "also is unclean",
 154                   reason);
 155 
 156     } else {
 157         crm_warn("Cluster node %s %s: %s",
 158                  node->details->uname,
 159                  pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 160                  reason);
 161         node->details->unclean = TRUE;
 162         pe_fence_op(node, NULL, TRUE, reason, priority_delay, data_set);
 163     }
 164 }
 165 
 166 // @TODO xpaths can't handle templates, rules, or id-refs
 167 
 168 // nvpair with provides or requires set to unfencing
 169 #define XPATH_UNFENCING_NVPAIR XML_CIB_TAG_NVPAIR                \
 170     "[(@" XML_NVPAIR_ATTR_NAME "='" PCMK_STONITH_PROVIDES "'"    \
 171     "or @" XML_NVPAIR_ATTR_NAME "='" XML_RSC_ATTR_REQUIRES "') " \
 172     "and @" XML_NVPAIR_ATTR_VALUE "='unfencing']"
 173 
 174 // unfencing in rsc_defaults or any resource
 175 #define XPATH_ENABLE_UNFENCING \
 176     "/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RESOURCES   \
 177     "//" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR                                               \
 178     "|/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RSCCONFIG  \
 179     "/" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR
 180 
 181 static void
 182 set_if_xpath(uint64_t flag, const char *xpath, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 183 {
 184     xmlXPathObjectPtr result = NULL;
 185 
 186     if (!pcmk_is_set(data_set->flags, flag)) {
 187         result = xpath_search(data_set->input, xpath);
 188         if (result && (numXpathResults(result) > 0)) {
 189             pe__set_working_set_flags(data_set, flag);
 190         }
 191         freeXpathObject(result);
 192     }
 193 }
 194 
 195 gboolean
 196 unpack_config(xmlNode * config, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 197 {
 198     const char *value = NULL;
 199     GHashTable *config_hash = pcmk__strkey_table(free, free);
 200 
 201     pe_rule_eval_data_t rule_data = {
 202         .node_hash = NULL,
 203         .role = RSC_ROLE_UNKNOWN,
 204         .now = data_set->now,
 205         .match_data = NULL,
 206         .rsc_data = NULL,
 207         .op_data = NULL
 208     };
 209 
 210     data_set->config_hash = config_hash;
 211 
 212     pe__unpack_dataset_nvpairs(config, XML_CIB_TAG_PROPSET, &rule_data, config_hash,
 213                                CIB_OPTIONS_FIRST, FALSE, data_set);
 214 
 215     verify_pe_options(data_set->config_hash);
 216 
 217     set_config_flag(data_set, "enable-startup-probes", pe_flag_startup_probes);
 218     if (!pcmk_is_set(data_set->flags, pe_flag_startup_probes)) {
 219         crm_info("Startup probes: disabled (dangerous)");
 220     }
 221 
 222     value = pe_pref(data_set->config_hash, XML_ATTR_HAVE_WATCHDOG);
 223     if (value && crm_is_true(value)) {
 224         crm_info("Watchdog-based self-fencing will be performed via SBD if "
 225                  "fencing is required and stonith-watchdog-timeout is nonzero");
 226         pe__set_working_set_flags(data_set, pe_flag_have_stonith_resource);
 227     }
 228 
 229     /* Set certain flags via xpath here, so they can be used before the relevant
 230      * configuration sections are unpacked.
 231      */
 232     set_if_xpath(pe_flag_enable_unfencing, XPATH_ENABLE_UNFENCING, data_set);
 233 
 234     value = pe_pref(data_set->config_hash, "stonith-timeout");
 235     data_set->stonith_timeout = (int) crm_parse_interval_spec(value);
 236     crm_debug("STONITH timeout: %d", data_set->stonith_timeout);
 237 
 238     set_config_flag(data_set, "stonith-enabled", pe_flag_stonith_enabled);
 239     crm_debug("STONITH of failed nodes is %s",
 240               pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)? "enabled" : "disabled");
 241 
 242     data_set->stonith_action = pe_pref(data_set->config_hash, "stonith-action");
 243     if (!strcmp(data_set->stonith_action, "poweroff")) {
 244         pe_warn_once(pe_wo_poweroff,
 245                      "Support for stonith-action of 'poweroff' is deprecated "
 246                      "and will be removed in a future release (use 'off' instead)");
 247         data_set->stonith_action = "off";
 248     }
 249     crm_trace("STONITH will %s nodes", data_set->stonith_action);
 250 
 251     set_config_flag(data_set, "concurrent-fencing", pe_flag_concurrent_fencing);
 252     crm_debug("Concurrent fencing is %s",
 253               pcmk_is_set(data_set->flags, pe_flag_concurrent_fencing)? "enabled" : "disabled");
 254 
 255     value = pe_pref(data_set->config_hash,
 256                     XML_CONFIG_ATTR_PRIORITY_FENCING_DELAY);
 257     if (value) {
 258         data_set->priority_fencing_delay = crm_parse_interval_spec(value) / 1000;
 259         crm_trace("Priority fencing delay is %ds", data_set->priority_fencing_delay);
 260     }
 261 
 262     set_config_flag(data_set, "stop-all-resources", pe_flag_stop_everything);
 263     crm_debug("Stop all active resources: %s",
 264               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_stop_everything)));
 265 
 266     set_config_flag(data_set, "symmetric-cluster", pe_flag_symmetric_cluster);
 267     if (pcmk_is_set(data_set->flags, pe_flag_symmetric_cluster)) {
 268         crm_debug("Cluster is symmetric" " - resources can run anywhere by default");
 269     }
 270 
 271     value = pe_pref(data_set->config_hash, "no-quorum-policy");
 272 
 273     if (pcmk__str_eq(value, "ignore", pcmk__str_casei)) {
 274         data_set->no_quorum_policy = no_quorum_ignore;
 275 
 276     } else if (pcmk__str_eq(value, "freeze", pcmk__str_casei)) {
 277         data_set->no_quorum_policy = no_quorum_freeze;
 278 
 279     } else if (pcmk__str_eq(value, "demote", pcmk__str_casei)) {
 280         data_set->no_quorum_policy = no_quorum_demote;
 281 
 282     } else if (pcmk__str_eq(value, "suicide", pcmk__str_casei)) {
 283         if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 284             int do_panic = 0;
 285 
 286             crm_element_value_int(data_set->input, XML_ATTR_QUORUM_PANIC,
 287                                   &do_panic);
 288             if (do_panic || pcmk_is_set(data_set->flags, pe_flag_have_quorum)) {
 289                 data_set->no_quorum_policy = no_quorum_suicide;
 290             } else {
 291                 crm_notice("Resetting no-quorum-policy to 'stop': cluster has never had quorum");
 292                 data_set->no_quorum_policy = no_quorum_stop;
 293             }
 294         } else {
 295             pcmk__config_err("Resetting no-quorum-policy to 'stop' because "
 296                              "fencing is disabled");
 297             data_set->no_quorum_policy = no_quorum_stop;
 298         }
 299 
 300     } else {
 301         data_set->no_quorum_policy = no_quorum_stop;
 302     }
 303 
 304     switch (data_set->no_quorum_policy) {
 305         case no_quorum_freeze:
 306             crm_debug("On loss of quorum: Freeze resources");
 307             break;
 308         case no_quorum_stop:
 309             crm_debug("On loss of quorum: Stop ALL resources");
 310             break;
 311         case no_quorum_demote:
 312             crm_debug("On loss of quorum: "
 313                       "Demote promotable resources and stop other resources");
 314             break;
 315         case no_quorum_suicide:
 316             crm_notice("On loss of quorum: Fence all remaining nodes");
 317             break;
 318         case no_quorum_ignore:
 319             crm_notice("On loss of quorum: Ignore");
 320             break;
 321     }
 322 
 323     set_config_flag(data_set, "stop-orphan-resources", pe_flag_stop_rsc_orphans);
 324     crm_trace("Orphan resources are %s",
 325               pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)? "stopped" : "ignored");
 326 
 327     set_config_flag(data_set, "stop-orphan-actions", pe_flag_stop_action_orphans);
 328     crm_trace("Orphan resource actions are %s",
 329               pcmk_is_set(data_set->flags, pe_flag_stop_action_orphans)? "stopped" : "ignored");
 330 
 331     value = pe_pref(data_set->config_hash, "remove-after-stop");
 332     if (value != NULL) {
 333         if (crm_is_true(value)) {
 334             pe__set_working_set_flags(data_set, pe_flag_remove_after_stop);
 335 #ifndef PCMK__COMPAT_2_0
 336             pe_warn_once(pe_wo_remove_after,
 337                          "Support for the remove-after-stop cluster property is"
 338                          " deprecated and will be removed in a future release");
 339 #endif
 340         } else {
 341             pe__clear_working_set_flags(data_set, pe_flag_remove_after_stop);
 342         }
 343     }
 344 
 345     set_config_flag(data_set, "maintenance-mode", pe_flag_maintenance_mode);
 346     crm_trace("Maintenance mode: %s",
 347               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_maintenance_mode)));
 348 
 349     set_config_flag(data_set, "start-failure-is-fatal", pe_flag_start_failure_fatal);
 350     crm_trace("Start failures are %s",
 351               pcmk_is_set(data_set->flags, pe_flag_start_failure_fatal)? "always fatal" : "handled by failcount");
 352 
 353     if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 354         set_config_flag(data_set, "startup-fencing", pe_flag_startup_fencing);
 355     }
 356     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 357         crm_trace("Unseen nodes will be fenced");
 358     } else {
 359         pe_warn_once(pe_wo_blind, "Blind faith: not fencing unseen nodes");
 360     }
 361 
 362     pe__unpack_node_health_scores(data_set);
 363 
 364     data_set->placement_strategy = pe_pref(data_set->config_hash, "placement-strategy");
 365     crm_trace("Placement strategy: %s", data_set->placement_strategy);
 366 
 367     set_config_flag(data_set, "shutdown-lock", pe_flag_shutdown_lock);
 368     crm_trace("Resources will%s be locked to cleanly shut down nodes",
 369               (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)? "" : " not"));
 370     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
 371         value = pe_pref(data_set->config_hash,
 372                         XML_CONFIG_ATTR_SHUTDOWN_LOCK_LIMIT);
 373         data_set->shutdown_lock = crm_parse_interval_spec(value) / 1000;
 374         crm_trace("Shutdown locks expire after %us", data_set->shutdown_lock);
 375     }
 376 
 377     return TRUE;
 378 }
 379 
 380 pe_node_t *
 381 pe_create_node(const char *id, const char *uname, const char *type,
     /* [previous][next][first][last][top][bottom][index][help] */
 382                const char *score, pe_working_set_t * data_set)
 383 {
 384     pe_node_t *new_node = NULL;
 385 
 386     if (pe_find_node(data_set->nodes, uname) != NULL) {
 387         pcmk__config_warn("More than one node entry has name '%s'", uname);
 388     }
 389 
 390     new_node = calloc(1, sizeof(pe_node_t));
 391     if (new_node == NULL) {
 392         return NULL;
 393     }
 394 
 395     new_node->weight = char2score(score);
 396     new_node->fixed = FALSE;
 397     new_node->details = calloc(1, sizeof(struct pe_node_shared_s));
 398 
 399     if (new_node->details == NULL) {
 400         free(new_node);
 401         return NULL;
 402     }
 403 
 404     crm_trace("Creating node for entry %s/%s", uname, id);
 405     new_node->details->id = id;
 406     new_node->details->uname = uname;
 407     new_node->details->online = FALSE;
 408     new_node->details->shutdown = FALSE;
 409     new_node->details->rsc_discovery_enabled = TRUE;
 410     new_node->details->running_rsc = NULL;
 411     new_node->details->data_set = data_set;
 412 
 413     if (pcmk__str_eq(type, "member", pcmk__str_null_matches | pcmk__str_casei)) {
 414         new_node->details->type = node_member;
 415 
 416     } else if (pcmk__str_eq(type, "remote", pcmk__str_casei)) {
 417         new_node->details->type = node_remote;
 418         pe__set_working_set_flags(data_set, pe_flag_have_remote_nodes);
 419 
 420     } else {
 421         /* @COMPAT 'ping' is the default for backward compatibility, but it
 422          * should be changed to 'member' at a compatibility break
 423          */
 424         if (!pcmk__str_eq(type, "ping", pcmk__str_casei)) {
 425             pcmk__config_warn("Node %s has unrecognized type '%s', "
 426                               "assuming 'ping'", crm_str(uname), type);
 427         }
 428         pe_warn_once(pe_wo_ping_node,
 429                      "Support for nodes of type 'ping' (such as %s) is "
 430                      "deprecated and will be removed in a future release",
 431                      crm_str(uname));
 432         new_node->details->type = node_ping;
 433     }
 434 
 435     new_node->details->attrs = pcmk__strkey_table(free, free);
 436 
 437     if (pe__is_guest_or_remote_node(new_node)) {
 438         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 439                             strdup("remote"));
 440     } else {
 441         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 442                             strdup("cluster"));
 443     }
 444 
 445     new_node->details->utilization = pcmk__strkey_table(free, free);
 446     new_node->details->digest_cache = pcmk__strkey_table(free,
 447                                                           pe__free_digests);
 448 
 449     data_set->nodes = g_list_insert_sorted(data_set->nodes, new_node, sort_node_uname);
 450     return new_node;
 451 }
 452 
 453 static const char *
 454 expand_remote_rsc_meta(xmlNode *xml_obj, xmlNode *parent, pe_working_set_t *data)
     /* [previous][next][first][last][top][bottom][index][help] */
 455 {
 456     xmlNode *attr_set = NULL;
 457     xmlNode *attr = NULL;
 458 
 459     const char *container_id = ID(xml_obj);
 460     const char *remote_name = NULL;
 461     const char *remote_server = NULL;
 462     const char *remote_port = NULL;
 463     const char *connect_timeout = "60s";
 464     const char *remote_allow_migrate=NULL;
 465     const char *is_managed = NULL;
 466 
 467     for (attr_set = pcmk__xe_first_child(xml_obj); attr_set != NULL;
 468          attr_set = pcmk__xe_next(attr_set)) {
 469 
 470         if (!pcmk__str_eq((const char *)attr_set->name, XML_TAG_META_SETS,
 471                           pcmk__str_casei)) {
 472             continue;
 473         }
 474 
 475         for (attr = pcmk__xe_first_child(attr_set); attr != NULL;
 476              attr = pcmk__xe_next(attr)) {
 477             const char *value = crm_element_value(attr, XML_NVPAIR_ATTR_VALUE);
 478             const char *name = crm_element_value(attr, XML_NVPAIR_ATTR_NAME);
 479 
 480             if (pcmk__str_eq(name, XML_RSC_ATTR_REMOTE_NODE, pcmk__str_casei)) {
 481                 remote_name = value;
 482             } else if (pcmk__str_eq(name, "remote-addr", pcmk__str_casei)) {
 483                 remote_server = value;
 484             } else if (pcmk__str_eq(name, "remote-port", pcmk__str_casei)) {
 485                 remote_port = value;
 486             } else if (pcmk__str_eq(name, "remote-connect-timeout", pcmk__str_casei)) {
 487                 connect_timeout = value;
 488             } else if (pcmk__str_eq(name, "remote-allow-migrate", pcmk__str_casei)) {
 489                 remote_allow_migrate=value;
 490             } else if (pcmk__str_eq(name, XML_RSC_ATTR_MANAGED, pcmk__str_casei)) {
 491                 is_managed = value;
 492             }
 493         }
 494     }
 495 
 496     if (remote_name == NULL) {
 497         return NULL;
 498     }
 499 
 500     if (pe_find_resource(data->resources, remote_name) != NULL) {
 501         return NULL;
 502     }
 503 
 504     pe_create_remote_xml(parent, remote_name, container_id,
 505                          remote_allow_migrate, is_managed,
 506                          connect_timeout, remote_server, remote_port);
 507     return remote_name;
 508 }
 509 
 510 static void
 511 handle_startup_fencing(pe_working_set_t *data_set, pe_node_t *new_node)
     /* [previous][next][first][last][top][bottom][index][help] */
 512 {
 513     if ((new_node->details->type == node_remote) && (new_node->details->remote_rsc == NULL)) {
 514         /* Ignore fencing for remote nodes that don't have a connection resource
 515          * associated with them. This happens when remote node entries get left
 516          * in the nodes section after the connection resource is removed.
 517          */
 518         return;
 519     }
 520 
 521     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 522         // All nodes are unclean until we've seen their status entry
 523         new_node->details->unclean = TRUE;
 524 
 525     } else {
 526         // Blind faith ...
 527         new_node->details->unclean = FALSE;
 528     }
 529 
 530     /* We need to be able to determine if a node's status section
 531      * exists or not separate from whether the node is unclean. */
 532     new_node->details->unseen = TRUE;
 533 }
 534 
 535 gboolean
 536 unpack_nodes(xmlNode * xml_nodes, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 537 {
 538     xmlNode *xml_obj = NULL;
 539     pe_node_t *new_node = NULL;
 540     const char *id = NULL;
 541     const char *uname = NULL;
 542     const char *type = NULL;
 543     const char *score = NULL;
 544 
 545     pe_rule_eval_data_t rule_data = {
 546         .node_hash = NULL,
 547         .role = RSC_ROLE_UNKNOWN,
 548         .now = data_set->now,
 549         .match_data = NULL,
 550         .rsc_data = NULL,
 551         .op_data = NULL
 552     };
 553 
 554     for (xml_obj = pcmk__xe_first_child(xml_nodes); xml_obj != NULL;
 555          xml_obj = pcmk__xe_next(xml_obj)) {
 556 
 557         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_NODE, pcmk__str_none)) {
 558             new_node = NULL;
 559 
 560             id = crm_element_value(xml_obj, XML_ATTR_ID);
 561             uname = crm_element_value(xml_obj, XML_ATTR_UNAME);
 562             type = crm_element_value(xml_obj, XML_ATTR_TYPE);
 563             score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
 564             crm_trace("Processing node %s/%s", uname, id);
 565 
 566             if (id == NULL) {
 567                 pcmk__config_err("Ignoring <" XML_CIB_TAG_NODE
 568                                  "> entry in configuration without id");
 569                 continue;
 570             }
 571             new_node = pe_create_node(id, uname, type, score, data_set);
 572 
 573             if (new_node == NULL) {
 574                 return FALSE;
 575             }
 576 
 577 /*              if(data_set->have_quorum == FALSE */
 578 /*                 && data_set->no_quorum_policy == no_quorum_stop) { */
 579 /*                      /\* start shutting resources down *\/ */
 580 /*                      new_node->weight = -INFINITY; */
 581 /*              } */
 582 
 583             handle_startup_fencing(data_set, new_node);
 584 
 585             add_node_attrs(xml_obj, new_node, FALSE, data_set);
 586             pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_UTILIZATION, &rule_data,
 587                                        new_node->details->utilization, NULL,
 588                                        FALSE, data_set);
 589 
 590             crm_trace("Done with node %s", crm_element_value(xml_obj, XML_ATTR_UNAME));
 591         }
 592     }
 593 
 594     if (data_set->localhost && pe_find_node(data_set->nodes, data_set->localhost) == NULL) {
 595         crm_info("Creating a fake local node");
 596         pe_create_node(data_set->localhost, data_set->localhost, NULL, 0,
 597                        data_set);
 598     }
 599 
 600     return TRUE;
 601 }
 602 
 603 static void
 604 setup_container(pe_resource_t * rsc, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 605 {
 606     const char *container_id = NULL;
 607 
 608     if (rsc->children) {
 609         g_list_foreach(rsc->children, (GFunc) setup_container, data_set);
 610         return;
 611     }
 612 
 613     container_id = g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_CONTAINER);
 614     if (container_id && !pcmk__str_eq(container_id, rsc->id, pcmk__str_casei)) {
 615         pe_resource_t *container = pe_find_resource(data_set->resources, container_id);
 616 
 617         if (container) {
 618             rsc->container = container;
 619             pe__set_resource_flags(container, pe_rsc_is_container);
 620             container->fillers = g_list_append(container->fillers, rsc);
 621             pe_rsc_trace(rsc, "Resource %s's container is %s", rsc->id, container_id);
 622         } else {
 623             pe_err("Resource %s: Unknown resource container (%s)", rsc->id, container_id);
 624         }
 625     }
 626 }
 627 
 628 gboolean
 629 unpack_remote_nodes(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 630 {
 631     xmlNode *xml_obj = NULL;
 632 
 633     /* Create remote nodes and guest nodes from the resource configuration
 634      * before unpacking resources.
 635      */
 636     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 637          xml_obj = pcmk__xe_next(xml_obj)) {
 638 
 639         const char *new_node_id = NULL;
 640 
 641         /* Check for remote nodes, which are defined by ocf:pacemaker:remote
 642          * primitives.
 643          */
 644         if (xml_contains_remote_node(xml_obj)) {
 645             new_node_id = ID(xml_obj);
 646             /* The "pe_find_node" check is here to make sure we don't iterate over
 647              * an expanded node that has already been added to the node list. */
 648             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 649                 crm_trace("Found remote node %s defined by resource %s",
 650                           new_node_id, ID(xml_obj));
 651                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 652                                data_set);
 653             }
 654             continue;
 655         }
 656 
 657         /* Check for guest nodes, which are defined by special meta-attributes
 658          * of a primitive of any type (for example, VirtualDomain or Xen).
 659          */
 660         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_RESOURCE, pcmk__str_none)) {
 661             /* This will add an ocf:pacemaker:remote primitive to the
 662              * configuration for the guest node's connection, to be unpacked
 663              * later.
 664              */
 665             new_node_id = expand_remote_rsc_meta(xml_obj, xml_resources, data_set);
 666             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 667                 crm_trace("Found guest node %s in resource %s",
 668                           new_node_id, ID(xml_obj));
 669                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 670                                data_set);
 671             }
 672             continue;
 673         }
 674 
 675         /* Check for guest nodes inside a group. Clones are currently not
 676          * supported as guest nodes.
 677          */
 678         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_GROUP, pcmk__str_none)) {
 679             xmlNode *xml_obj2 = NULL;
 680             for (xml_obj2 = pcmk__xe_first_child(xml_obj); xml_obj2 != NULL;
 681                  xml_obj2 = pcmk__xe_next(xml_obj2)) {
 682 
 683                 new_node_id = expand_remote_rsc_meta(xml_obj2, xml_resources, data_set);
 684 
 685                 if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 686                     crm_trace("Found guest node %s in resource %s inside group %s",
 687                               new_node_id, ID(xml_obj2), ID(xml_obj));
 688                     pe_create_node(new_node_id, new_node_id, "remote", NULL,
 689                                    data_set);
 690                 }
 691             }
 692         }
 693     }
 694     return TRUE;
 695 }
 696 
 697 /* Call this after all the nodes and resources have been
 698  * unpacked, but before the status section is read.
 699  *
 700  * A remote node's online status is reflected by the state
 701  * of the remote node's connection resource. We need to link
 702  * the remote node to this connection resource so we can have
 703  * easy access to the connection resource during the scheduler calculations.
 704  */
 705 static void
 706 link_rsc2remotenode(pe_working_set_t *data_set, pe_resource_t *new_rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
 707 {
 708     pe_node_t *remote_node = NULL;
 709 
 710     if (new_rsc->is_remote_node == FALSE) {
 711         return;
 712     }
 713 
 714     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 715         /* remote_nodes and remote_resources are not linked in quick location calculations */
 716         return;
 717     }
 718 
 719     remote_node = pe_find_node(data_set->nodes, new_rsc->id);
 720     CRM_CHECK(remote_node != NULL, return);
 721 
 722     pe_rsc_trace(new_rsc, "Linking remote connection resource %s to node %s",
 723                  new_rsc->id, remote_node->details->uname);
 724     remote_node->details->remote_rsc = new_rsc;
 725 
 726     if (new_rsc->container == NULL) {
 727         /* Handle start-up fencing for remote nodes (as opposed to guest nodes)
 728          * the same as is done for cluster nodes.
 729          */
 730         handle_startup_fencing(data_set, remote_node);
 731 
 732     } else {
 733         /* pe_create_node() marks the new node as "remote" or "cluster"; now
 734          * that we know the node is a guest node, update it correctly.
 735          */
 736         g_hash_table_replace(remote_node->details->attrs, strdup(CRM_ATTR_KIND),
 737                              strdup("container"));
 738     }
 739 }
 740 
 741 static void
 742 destroy_tag(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 743 {
 744     pe_tag_t *tag = data;
 745 
 746     if (tag) {
 747         free(tag->id);
 748         g_list_free_full(tag->refs, free);
 749         free(tag);
 750     }
 751 }
 752 
 753 /*!
 754  * \internal
 755  * \brief Parse configuration XML for resource information
 756  *
 757  * \param[in]     xml_resources  Top of resource configuration XML
 758  * \param[in,out] data_set       Where to put resource information
 759  *
 760  * \return TRUE
 761  *
 762  * \note unpack_remote_nodes() MUST be called before this, so that the nodes can
 763  *       be used when common_unpack() calls resource_location()
 764  */
 765 gboolean
 766 unpack_resources(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 767 {
 768     xmlNode *xml_obj = NULL;
 769     GList *gIter = NULL;
 770 
 771     data_set->template_rsc_sets = pcmk__strkey_table(free, destroy_tag);
 772 
 773     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 774          xml_obj = pcmk__xe_next(xml_obj)) {
 775 
 776         pe_resource_t *new_rsc = NULL;
 777 
 778         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_RSC_TEMPLATE, pcmk__str_none)) {
 779             const char *template_id = ID(xml_obj);
 780 
 781             if (template_id && g_hash_table_lookup_extended(data_set->template_rsc_sets,
 782                                                             template_id, NULL, NULL) == FALSE) {
 783                 /* Record the template's ID for the knowledge of its existence anyway. */
 784                 g_hash_table_insert(data_set->template_rsc_sets, strdup(template_id), NULL);
 785             }
 786             continue;
 787         }
 788 
 789         crm_trace("Beginning unpack... <%s id=%s... >", crm_element_name(xml_obj), ID(xml_obj));
 790         if (common_unpack(xml_obj, &new_rsc, NULL, data_set) && (new_rsc != NULL)) {
 791             data_set->resources = g_list_append(data_set->resources, new_rsc);
 792             pe_rsc_trace(new_rsc, "Added resource %s", new_rsc->id);
 793 
 794         } else {
 795             pcmk__config_err("Ignoring <%s> resource '%s' "
 796                              "because configuration is invalid",
 797                              crm_element_name(xml_obj), crm_str(ID(xml_obj)));
 798             if (new_rsc != NULL && new_rsc->fns != NULL) {
 799                 new_rsc->fns->free(new_rsc);
 800             }
 801         }
 802     }
 803 
 804     for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) {
 805         pe_resource_t *rsc = (pe_resource_t *) gIter->data;
 806 
 807         setup_container(rsc, data_set);
 808         link_rsc2remotenode(data_set, rsc);
 809     }
 810 
 811     data_set->resources = g_list_sort(data_set->resources, sort_rsc_priority);
 812     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 813         /* Ignore */
 814 
 815     } else if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
 816                && !pcmk_is_set(data_set->flags, pe_flag_have_stonith_resource)) {
 817 
 818         pcmk__config_err("Resource start-up disabled since no STONITH resources have been defined");
 819         pcmk__config_err("Either configure some or disable STONITH with the stonith-enabled option");
 820         pcmk__config_err("NOTE: Clusters with shared data need STONITH to ensure data integrity");
 821     }
 822 
 823     return TRUE;
 824 }
 825 
 826 gboolean
 827 unpack_tags(xmlNode * xml_tags, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 828 {
 829     xmlNode *xml_tag = NULL;
 830 
 831     data_set->tags = pcmk__strkey_table(free, destroy_tag);
 832 
 833     for (xml_tag = pcmk__xe_first_child(xml_tags); xml_tag != NULL;
 834          xml_tag = pcmk__xe_next(xml_tag)) {
 835 
 836         xmlNode *xml_obj_ref = NULL;
 837         const char *tag_id = ID(xml_tag);
 838 
 839         if (!pcmk__str_eq((const char *)xml_tag->name, XML_CIB_TAG_TAG, pcmk__str_none)) {
 840             continue;
 841         }
 842 
 843         if (tag_id == NULL) {
 844             pcmk__config_err("Ignoring <%s> without " XML_ATTR_ID,
 845                              crm_element_name(xml_tag));
 846             continue;
 847         }
 848 
 849         for (xml_obj_ref = pcmk__xe_first_child(xml_tag); xml_obj_ref != NULL;
 850              xml_obj_ref = pcmk__xe_next(xml_obj_ref)) {
 851 
 852             const char *obj_ref = ID(xml_obj_ref);
 853 
 854             if (!pcmk__str_eq((const char *)xml_obj_ref->name, XML_CIB_TAG_OBJ_REF, pcmk__str_none)) {
 855                 continue;
 856             }
 857 
 858             if (obj_ref == NULL) {
 859                 pcmk__config_err("Ignoring <%s> for tag '%s' without " XML_ATTR_ID,
 860                                  crm_element_name(xml_obj_ref), tag_id);
 861                 continue;
 862             }
 863 
 864             if (add_tag_ref(data_set->tags, tag_id, obj_ref) == FALSE) {
 865                 return FALSE;
 866             }
 867         }
 868     }
 869 
 870     return TRUE;
 871 }
 872 
 873 /* The ticket state section:
 874  * "/cib/status/tickets/ticket_state" */
 875 static gboolean
 876 unpack_ticket_state(xmlNode * xml_ticket, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 877 {
 878     const char *ticket_id = NULL;
 879     const char *granted = NULL;
 880     const char *last_granted = NULL;
 881     const char *standby = NULL;
 882     xmlAttrPtr xIter = NULL;
 883 
 884     pe_ticket_t *ticket = NULL;
 885 
 886     ticket_id = ID(xml_ticket);
 887     if (pcmk__str_empty(ticket_id)) {
 888         return FALSE;
 889     }
 890 
 891     crm_trace("Processing ticket state for %s", ticket_id);
 892 
 893     ticket = g_hash_table_lookup(data_set->tickets, ticket_id);
 894     if (ticket == NULL) {
 895         ticket = ticket_new(ticket_id, data_set);
 896         if (ticket == NULL) {
 897             return FALSE;
 898         }
 899     }
 900 
 901     for (xIter = xml_ticket->properties; xIter; xIter = xIter->next) {
 902         const char *prop_name = (const char *)xIter->name;
 903         const char *prop_value = crm_element_value(xml_ticket, prop_name);
 904 
 905         if (pcmk__str_eq(prop_name, XML_ATTR_ID, pcmk__str_none)) {
 906             continue;
 907         }
 908         g_hash_table_replace(ticket->state, strdup(prop_name), strdup(prop_value));
 909     }
 910 
 911     granted = g_hash_table_lookup(ticket->state, "granted");
 912     if (granted && crm_is_true(granted)) {
 913         ticket->granted = TRUE;
 914         crm_info("We have ticket '%s'", ticket->id);
 915     } else {
 916         ticket->granted = FALSE;
 917         crm_info("We do not have ticket '%s'", ticket->id);
 918     }
 919 
 920     last_granted = g_hash_table_lookup(ticket->state, "last-granted");
 921     if (last_granted) {
 922         long long last_granted_ll;
 923 
 924         pcmk__scan_ll(last_granted, &last_granted_ll, 0LL);
 925         ticket->last_granted = (time_t) last_granted_ll;
 926     }
 927 
 928     standby = g_hash_table_lookup(ticket->state, "standby");
 929     if (standby && crm_is_true(standby)) {
 930         ticket->standby = TRUE;
 931         if (ticket->granted) {
 932             crm_info("Granted ticket '%s' is in standby-mode", ticket->id);
 933         }
 934     } else {
 935         ticket->standby = FALSE;
 936     }
 937 
 938     crm_trace("Done with ticket state for %s", ticket_id);
 939 
 940     return TRUE;
 941 }
 942 
 943 static gboolean
 944 unpack_tickets_state(xmlNode * xml_tickets, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 945 {
 946     xmlNode *xml_obj = NULL;
 947 
 948     for (xml_obj = pcmk__xe_first_child(xml_tickets); xml_obj != NULL;
 949          xml_obj = pcmk__xe_next(xml_obj)) {
 950 
 951         if (!pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_TICKET_STATE, pcmk__str_none)) {
 952             continue;
 953         }
 954         unpack_ticket_state(xml_obj, data_set);
 955     }
 956 
 957     return TRUE;
 958 }
 959 
 960 static void
 961 unpack_handle_remote_attrs(pe_node_t *this_node, xmlNode *state, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
 962 {
 963     const char *resource_discovery_enabled = NULL;
 964     xmlNode *attrs = NULL;
 965     pe_resource_t *rsc = NULL;
 966 
 967     if (!pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
 968         return;
 969     }
 970 
 971     if ((this_node == NULL) || !pe__is_guest_or_remote_node(this_node)) {
 972         return;
 973     }
 974     crm_trace("Processing remote node id=%s, uname=%s", this_node->details->id, this_node->details->uname);
 975 
 976     pcmk__scan_min_int(crm_element_value(state, XML_NODE_IS_MAINTENANCE),
 977                        &(this_node->details->remote_maintenance), 0);
 978 
 979     rsc = this_node->details->remote_rsc;
 980     if (this_node->details->remote_requires_reset == FALSE) {
 981         this_node->details->unclean = FALSE;
 982         this_node->details->unseen = FALSE;
 983     }
 984     attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
 985     add_node_attrs(attrs, this_node, TRUE, data_set);
 986 
 987     if (pe__shutdown_requested(this_node)) {
 988         crm_info("Node %s is shutting down", this_node->details->uname);
 989         this_node->details->shutdown = TRUE;
 990     }
 991  
 992     if (crm_is_true(pe_node_attribute_raw(this_node, "standby"))) {
 993         crm_info("Node %s is in standby-mode", this_node->details->uname);
 994         this_node->details->standby = TRUE;
 995     }
 996 
 997     if (crm_is_true(pe_node_attribute_raw(this_node, "maintenance")) ||
 998         ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed))) {
 999         crm_info("Node %s is in maintenance-mode", this_node->details->uname);
1000         this_node->details->maintenance = TRUE;
1001     }
1002 
1003     resource_discovery_enabled = pe_node_attribute_raw(this_node, XML_NODE_ATTR_RSC_DISCOVERY);
1004     if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
1005         if (pe__is_remote_node(this_node)
1006             && !pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1007             crm_warn("Ignoring %s attribute on remote node %s because stonith is disabled",
1008                      XML_NODE_ATTR_RSC_DISCOVERY, this_node->details->uname);
1009         } else {
1010             /* This is either a remote node with fencing enabled, or a guest
1011              * node. We don't care whether fencing is enabled when fencing guest
1012              * nodes, because they are "fenced" by recovering their containing
1013              * resource.
1014              */
1015             crm_info("Node %s has resource discovery disabled", this_node->details->uname);
1016             this_node->details->rsc_discovery_enabled = FALSE;
1017         }
1018     }
1019 }
1020 
1021 /*!
1022  * \internal
1023  * \brief Unpack a cluster node's transient attributes
1024  *
1025  * \param[in] state     CIB node state XML
1026  * \param[in] node      Cluster node whose attributes are being unpacked
1027  * \param[in] data_set  Cluster working set
1028  */
1029 static void
1030 unpack_transient_attributes(xmlNode *state, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1031                             pe_working_set_t *data_set)
1032 {
1033     const char *discovery = NULL;
1034     xmlNode *attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
1035 
1036     add_node_attrs(attrs, node, TRUE, data_set);
1037 
1038     if (crm_is_true(pe_node_attribute_raw(node, "standby"))) {
1039         crm_info("Node %s is in standby-mode", node->details->uname);
1040         node->details->standby = TRUE;
1041     }
1042 
1043     if (crm_is_true(pe_node_attribute_raw(node, "maintenance"))) {
1044         crm_info("Node %s is in maintenance-mode", node->details->uname);
1045         node->details->maintenance = TRUE;
1046     }
1047 
1048     discovery = pe_node_attribute_raw(node, XML_NODE_ATTR_RSC_DISCOVERY);
1049     if ((discovery != NULL) && !crm_is_true(discovery)) {
1050         crm_warn("Ignoring %s attribute for node %s because disabling "
1051                  "resource discovery is not allowed for cluster nodes",
1052                  XML_NODE_ATTR_RSC_DISCOVERY, node->details->uname);
1053     }
1054 }
1055 
1056 /*!
1057  * \internal
1058  * \brief Unpack a node state entry (first pass)
1059  *
1060  * Unpack one node state entry from status. This unpacks information from the
1061  * node_state element itself and node attributes inside it, but not the
1062  * resource history inside it. Multiple passes through the status are needed to
1063  * fully unpack everything.
1064  *
1065  * \param[in] state     CIB node state XML
1066  * \param[in] data_set  Cluster working set
1067  */
1068 static void
1069 unpack_node_state(xmlNode *state, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1070 {
1071     const char *id = NULL;
1072     const char *uname = NULL;
1073     pe_node_t *this_node = NULL;
1074 
1075     id = crm_element_value(state, XML_ATTR_ID);
1076     if (id == NULL) {
1077         crm_warn("Ignoring malformed " XML_CIB_TAG_STATE " entry without "
1078                  XML_ATTR_ID);
1079         return;
1080     }
1081 
1082     uname = crm_element_value(state, XML_ATTR_UNAME);
1083     if (uname == NULL) {
1084         crm_warn("Ignoring malformed " XML_CIB_TAG_STATE " entry without "
1085                  XML_ATTR_UNAME);
1086         return;
1087     }
1088 
1089     this_node = pe_find_node_any(data_set->nodes, id, uname);
1090     if (this_node == NULL) {
1091         pcmk__config_warn("Ignoring recorded node state for '%s' because "
1092                           "it is no longer in the configuration", uname);
1093         return;
1094     }
1095 
1096     if (pe__is_guest_or_remote_node(this_node)) {
1097         /* We can't determine the online status of Pacemaker Remote nodes until
1098          * after all resource history has been unpacked. In this first pass, we
1099          * do need to mark whether the node has been fenced, as this plays a
1100          * role during unpacking cluster node resource state.
1101          */
1102         pcmk__scan_min_int(crm_element_value(state, XML_NODE_IS_FENCED),
1103                            &(this_node->details->remote_was_fenced), 0);
1104         return;
1105     }
1106 
1107     unpack_transient_attributes(state, this_node, data_set);
1108 
1109     /* Provisionally mark this cluster node as clean. We have at least seen it
1110      * in the current cluster's lifetime.
1111      */
1112     this_node->details->unclean = FALSE;
1113     this_node->details->unseen = FALSE;
1114 
1115     crm_trace("Determining online status of cluster node %s (id %s)",
1116               this_node->details->uname, id);
1117     determine_online_status(state, this_node, data_set);
1118 
1119     if (!pcmk_is_set(data_set->flags, pe_flag_have_quorum)
1120         && this_node->details->online
1121         && (data_set->no_quorum_policy == no_quorum_suicide)) {
1122         /* Everything else should flow from this automatically
1123          * (at least until the scheduler becomes able to migrate off
1124          * healthy resources)
1125          */
1126         pe_fence_node(data_set, this_node, "cluster does not have quorum",
1127                       FALSE);
1128     }
1129 }
1130 
1131 /*!
1132  * \internal
1133  * \brief Unpack nodes' resource history as much as possible
1134  *
1135  * Unpack as many nodes' resource history as possible in one pass through the
1136  * status. We need to process Pacemaker Remote nodes' connections/containers
1137  * before unpacking their history; the connection/container history will be
1138  * in another node's history, so it might take multiple passes to unpack
1139  * everything.
1140  *
1141  * \param[in] status    CIB XML status section
1142  * \param[in] fence     If true, treat any not-yet-unpacked nodes as unseen
1143  * \param[in] data_set  Cluster working set
1144  *
1145  * \return Standard Pacemaker return code (specifically pcmk_rc_ok if done,
1146  *         or EAGAIN if more unpacking remains to be done)
1147  */
1148 static int
1149 unpack_node_history(xmlNode *status, bool fence, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1150 {
1151     int rc = pcmk_rc_ok;
1152 
1153     // Loop through all node_state entries in CIB status
1154     for (xmlNode *state = first_named_child(status, XML_CIB_TAG_STATE);
1155          state != NULL; state = crm_next_same_xml(state)) {
1156 
1157         const char *id = ID(state);
1158         const char *uname = crm_element_value(state, XML_ATTR_UNAME);
1159         pe_node_t *this_node = NULL;
1160 
1161         if ((id == NULL) || (uname == NULL)) {
1162             // Warning already logged in first pass through status section
1163             crm_trace("Not unpacking resource history from malformed "
1164                       XML_CIB_TAG_STATE " without id and/or uname");
1165             continue;
1166         }
1167 
1168         this_node = pe_find_node_any(data_set->nodes, id, uname);
1169         if (this_node == NULL) {
1170             // Warning already logged in first pass through status section
1171             crm_trace("Not unpacking resource history for node %s because "
1172                       "no longer in configuration", id);
1173             continue;
1174         }
1175 
1176         if (this_node->details->unpacked) {
1177             crm_trace("Not unpacking resource history for node %s because "
1178                       "already unpacked", id);
1179             continue;
1180         }
1181 
1182         if (fence) {
1183             // We're processing all remaining nodes
1184 
1185         } else if (pe__is_guest_node(this_node)) {
1186             /* We can unpack a guest node's history only after we've unpacked
1187              * other resource history to the point that we know that the node's
1188              * connection and containing resource are both up.
1189              */
1190             pe_resource_t *rsc = this_node->details->remote_rsc;
1191 
1192             if ((rsc == NULL) || (rsc->role != RSC_ROLE_STARTED)
1193                 || (rsc->container->role != RSC_ROLE_STARTED)) {
1194                 crm_trace("Not unpacking resource history for guest node %s "
1195                           "because container and connection are not known to "
1196                           "be up", id);
1197                 continue;
1198             }
1199 
1200         } else if (pe__is_remote_node(this_node)) {
1201             /* We can unpack a remote node's history only after we've unpacked
1202              * other resource history to the point that we know that the node's
1203              * connection is up, with the exception of when shutdown locks are
1204              * in use.
1205              */
1206             pe_resource_t *rsc = this_node->details->remote_rsc;
1207 
1208             if ((rsc == NULL)
1209                 || (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)
1210                     && (rsc->role != RSC_ROLE_STARTED))) {
1211                 crm_trace("Not unpacking resource history for remote node %s "
1212                           "because connection is not known to be up", id);
1213                 continue;
1214             }
1215 
1216         /* If fencing and shutdown locks are disabled and we're not processing
1217          * unseen nodes, then we don't want to unpack offline nodes until online
1218          * nodes have been unpacked. This allows us to number active clone
1219          * instances first.
1220          */
1221         } else if (!pcmk_any_flags_set(data_set->flags, pe_flag_stonith_enabled
1222                                                         |pe_flag_shutdown_lock)
1223                    && !this_node->details->online) {
1224             crm_trace("Not unpacking resource history for offline "
1225                       "cluster node %s", id);
1226             continue;
1227         }
1228 
1229         if (pe__is_guest_or_remote_node(this_node)) {
1230             determine_remote_online_status(data_set, this_node);
1231             unpack_handle_remote_attrs(this_node, state, data_set);
1232         }
1233 
1234         crm_trace("Unpacking resource history for %snode %s",
1235                   (fence? "unseen " : ""), id);
1236 
1237         this_node->details->unpacked = TRUE;
1238         unpack_node_lrm(this_node, state, data_set);
1239 
1240         rc = EAGAIN; // Other node histories might depend on this one
1241     }
1242     return rc;
1243 }
1244 
1245 /* remove nodes that are down, stopping */
1246 /* create positive rsc_to_node constraints between resources and the nodes they are running on */
1247 /* anything else? */
1248 gboolean
1249 unpack_status(xmlNode * status, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1250 {
1251     xmlNode *state = NULL;
1252 
1253     crm_trace("Beginning unpack");
1254 
1255     if (data_set->tickets == NULL) {
1256         data_set->tickets = pcmk__strkey_table(free, destroy_ticket);
1257     }
1258 
1259     for (state = pcmk__xe_first_child(status); state != NULL;
1260          state = pcmk__xe_next(state)) {
1261 
1262         if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_TICKETS, pcmk__str_none)) {
1263             unpack_tickets_state((xmlNode *) state, data_set);
1264 
1265         } else if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
1266             unpack_node_state(state, data_set);
1267         }
1268     }
1269 
1270     while (unpack_node_history(status, FALSE, data_set) == EAGAIN) {
1271         crm_trace("Another pass through node resource histories is needed");
1272     }
1273 
1274     // Now catch any nodes we didn't see
1275     unpack_node_history(status,
1276                         pcmk_is_set(data_set->flags, pe_flag_stonith_enabled),
1277                         data_set);
1278 
1279     /* Now that we know where resources are, we can schedule stops of containers
1280      * with failed bundle connections
1281      */
1282     if (data_set->stop_needed != NULL) {
1283         for (GList *item = data_set->stop_needed; item; item = item->next) {
1284             pe_resource_t *container = item->data;
1285             pe_node_t *node = pe__current_node(container);
1286 
1287             if (node) {
1288                 stop_action(container, node, FALSE);
1289             }
1290         }
1291         g_list_free(data_set->stop_needed);
1292         data_set->stop_needed = NULL;
1293     }
1294 
1295     /* Now that we know status of all Pacemaker Remote connections and nodes,
1296      * we can stop connections for node shutdowns, and check the online status
1297      * of remote/guest nodes that didn't have any node history to unpack.
1298      */
1299     for (GList *gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) {
1300         pe_node_t *this_node = gIter->data;
1301 
1302         if (!pe__is_guest_or_remote_node(this_node)) {
1303             continue;
1304         }
1305         if (this_node->details->shutdown
1306             && (this_node->details->remote_rsc != NULL)) {
1307             pe__set_next_role(this_node->details->remote_rsc, RSC_ROLE_STOPPED,
1308                               "remote shutdown");
1309         }
1310         if (!this_node->details->unpacked) {
1311             determine_remote_online_status(data_set, this_node);
1312         }
1313     }
1314 
1315     return TRUE;
1316 }
1317 
1318 static gboolean
1319 determine_online_status_no_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1320                                    pe_node_t * this_node)
1321 {
1322     gboolean online = FALSE;
1323     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1324     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1325     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1326     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1327 
1328     if (!crm_is_true(in_cluster)) {
1329         crm_trace("Node is down: in_cluster=%s", crm_str(in_cluster));
1330 
1331     } else if (pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei)) {
1332         if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1333             online = TRUE;
1334         } else {
1335             crm_debug("Node is not ready to run resources: %s", join);
1336         }
1337 
1338     } else if (this_node->details->expected_up == FALSE) {
1339         crm_trace("Controller is down: in_cluster=%s", crm_str(in_cluster));
1340         crm_trace("\tis_peer=%s, join=%s, expected=%s",
1341                   crm_str(is_peer), crm_str(join), crm_str(exp_state));
1342 
1343     } else {
1344         /* mark it unclean */
1345         pe_fence_node(data_set, this_node, "peer is unexpectedly down", FALSE);
1346         crm_info("\tin_cluster=%s, is_peer=%s, join=%s, expected=%s",
1347                  crm_str(in_cluster), crm_str(is_peer), crm_str(join), crm_str(exp_state));
1348     }
1349     return online;
1350 }
1351 
1352 static gboolean
1353 determine_online_status_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1354                                 pe_node_t * this_node)
1355 {
1356     gboolean online = FALSE;
1357     gboolean do_terminate = FALSE;
1358     bool crmd_online = FALSE;
1359     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1360     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1361     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1362     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1363     const char *terminate = pe_node_attribute_raw(this_node, "terminate");
1364 
1365 /*
1366   - XML_NODE_IN_CLUSTER    ::= true|false
1367   - XML_NODE_IS_PEER       ::= online|offline
1368   - XML_NODE_JOIN_STATE    ::= member|down|pending|banned
1369   - XML_NODE_EXPECTED      ::= member|down
1370 */
1371 
1372     if (crm_is_true(terminate)) {
1373         do_terminate = TRUE;
1374 
1375     } else if (terminate != NULL && strlen(terminate) > 0) {
1376         /* could be a time() value */
1377         char t = terminate[0];
1378 
1379         if (t != '0' && isdigit(t)) {
1380             do_terminate = TRUE;
1381         }
1382     }
1383 
1384     crm_trace("%s: in_cluster=%s, is_peer=%s, join=%s, expected=%s, term=%d",
1385               this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
1386               crm_str(join), crm_str(exp_state), do_terminate);
1387 
1388     online = crm_is_true(in_cluster);
1389     crmd_online = pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei);
1390     if (exp_state == NULL) {
1391         exp_state = CRMD_JOINSTATE_DOWN;
1392     }
1393 
1394     if (this_node->details->shutdown) {
1395         crm_debug("%s is shutting down", this_node->details->uname);
1396 
1397         /* Slightly different criteria since we can't shut down a dead peer */
1398         online = crmd_online;
1399 
1400     } else if (in_cluster == NULL) {
1401         pe_fence_node(data_set, this_node, "peer has not been seen by the cluster", FALSE);
1402 
1403     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_NACK, pcmk__str_casei)) {
1404         pe_fence_node(data_set, this_node,
1405                       "peer failed Pacemaker membership criteria", FALSE);
1406 
1407     } else if (do_terminate == FALSE && pcmk__str_eq(exp_state, CRMD_JOINSTATE_DOWN, pcmk__str_casei)) {
1408 
1409         if (crm_is_true(in_cluster) || crmd_online) {
1410             crm_info("- Node %s is not ready to run resources", this_node->details->uname);
1411             this_node->details->standby = TRUE;
1412             this_node->details->pending = TRUE;
1413 
1414         } else {
1415             crm_trace("%s is down or still coming up", this_node->details->uname);
1416         }
1417 
1418     } else if (do_terminate && pcmk__str_eq(join, CRMD_JOINSTATE_DOWN, pcmk__str_casei)
1419                && crm_is_true(in_cluster) == FALSE && !crmd_online) {
1420         crm_info("Node %s was just shot", this_node->details->uname);
1421         online = FALSE;
1422 
1423     } else if (crm_is_true(in_cluster) == FALSE) {
1424         // Consider `priority-fencing-delay` for lost nodes
1425         pe_fence_node(data_set, this_node, "peer is no longer part of the cluster", TRUE);
1426 
1427     } else if (!crmd_online) {
1428         pe_fence_node(data_set, this_node, "peer process is no longer available", FALSE);
1429 
1430         /* Everything is running at this point, now check join state */
1431     } else if (do_terminate) {
1432         pe_fence_node(data_set, this_node, "termination was requested", FALSE);
1433 
1434     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1435         crm_info("Node %s is active", this_node->details->uname);
1436 
1437     } else if (pcmk__strcase_any_of(join, CRMD_JOINSTATE_PENDING, CRMD_JOINSTATE_DOWN, NULL)) {
1438         crm_info("Node %s is not ready to run resources", this_node->details->uname);
1439         this_node->details->standby = TRUE;
1440         this_node->details->pending = TRUE;
1441 
1442     } else {
1443         pe_fence_node(data_set, this_node, "peer was in an unknown state", FALSE);
1444         crm_warn("%s: in-cluster=%s, is-peer=%s, join=%s, expected=%s, term=%d, shutdown=%d",
1445                  this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
1446                  crm_str(join), crm_str(exp_state), do_terminate, this_node->details->shutdown);
1447     }
1448 
1449     return online;
1450 }
1451 
1452 static void
1453 determine_remote_online_status(pe_working_set_t * data_set, pe_node_t * this_node)
     /* [previous][next][first][last][top][bottom][index][help] */
1454 {
1455     pe_resource_t *rsc = this_node->details->remote_rsc;
1456     pe_resource_t *container = NULL;
1457     pe_node_t *host = NULL;
1458 
1459     /* If there is a node state entry for a (former) Pacemaker Remote node
1460      * but no resource creating that node, the node's connection resource will
1461      * be NULL. Consider it an offline remote node in that case.
1462      */
1463     if (rsc == NULL) {
1464         this_node->details->online = FALSE;
1465         goto remote_online_done;
1466     }
1467 
1468     container = rsc->container;
1469 
1470     if (container && pcmk__list_of_1(rsc->running_on)) {
1471         host = rsc->running_on->data;
1472     }
1473 
1474     /* If the resource is currently started, mark it online. */
1475     if (rsc->role == RSC_ROLE_STARTED) {
1476         crm_trace("%s node %s presumed ONLINE because connection resource is started",
1477                   (container? "Guest" : "Remote"), this_node->details->id);
1478         this_node->details->online = TRUE;
1479     }
1480 
1481     /* consider this node shutting down if transitioning start->stop */
1482     if (rsc->role == RSC_ROLE_STARTED && rsc->next_role == RSC_ROLE_STOPPED) {
1483         crm_trace("%s node %s shutting down because connection resource is stopping",
1484                   (container? "Guest" : "Remote"), this_node->details->id);
1485         this_node->details->shutdown = TRUE;
1486     }
1487 
1488     /* Now check all the failure conditions. */
1489     if(container && pcmk_is_set(container->flags, pe_rsc_failed)) {
1490         crm_trace("Guest node %s UNCLEAN because guest resource failed",
1491                   this_node->details->id);
1492         this_node->details->online = FALSE;
1493         this_node->details->remote_requires_reset = TRUE;
1494 
1495     } else if (pcmk_is_set(rsc->flags, pe_rsc_failed)) {
1496         crm_trace("%s node %s OFFLINE because connection resource failed",
1497                   (container? "Guest" : "Remote"), this_node->details->id);
1498         this_node->details->online = FALSE;
1499 
1500     } else if (rsc->role == RSC_ROLE_STOPPED
1501         || (container && container->role == RSC_ROLE_STOPPED)) {
1502 
1503         crm_trace("%s node %s OFFLINE because its resource is stopped",
1504                   (container? "Guest" : "Remote"), this_node->details->id);
1505         this_node->details->online = FALSE;
1506         this_node->details->remote_requires_reset = FALSE;
1507 
1508     } else if (host && (host->details->online == FALSE)
1509                && host->details->unclean) {
1510         crm_trace("Guest node %s UNCLEAN because host is unclean",
1511                   this_node->details->id);
1512         this_node->details->online = FALSE;
1513         this_node->details->remote_requires_reset = TRUE;
1514     }
1515 
1516 remote_online_done:
1517     crm_trace("Remote node %s online=%s",
1518         this_node->details->id, this_node->details->online ? "TRUE" : "FALSE");
1519 }
1520 
1521 static void
1522 determine_online_status(xmlNode * node_state, pe_node_t * this_node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1523 {
1524     gboolean online = FALSE;
1525     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1526 
1527     CRM_CHECK(this_node != NULL, return);
1528 
1529     this_node->details->shutdown = FALSE;
1530     this_node->details->expected_up = FALSE;
1531 
1532     if (pe__shutdown_requested(this_node)) {
1533         this_node->details->shutdown = TRUE;
1534 
1535     } else if (pcmk__str_eq(exp_state, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1536         this_node->details->expected_up = TRUE;
1537     }
1538 
1539     if (this_node->details->type == node_ping) {
1540         this_node->details->unclean = FALSE;
1541         online = FALSE;         /* As far as resource management is concerned,
1542                                  * the node is safely offline.
1543                                  * Anyone caught abusing this logic will be shot
1544                                  */
1545 
1546     } else if (!pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1547         online = determine_online_status_no_fencing(data_set, node_state, this_node);
1548 
1549     } else {
1550         online = determine_online_status_fencing(data_set, node_state, this_node);
1551     }
1552 
1553     if (online) {
1554         this_node->details->online = TRUE;
1555 
1556     } else {
1557         /* remove node from contention */
1558         this_node->fixed = TRUE;
1559         this_node->weight = -INFINITY;
1560     }
1561 
1562     if (online && this_node->details->shutdown) {
1563         /* don't run resources here */
1564         this_node->fixed = TRUE;
1565         this_node->weight = -INFINITY;
1566     }
1567 
1568     if (this_node->details->type == node_ping) {
1569         crm_info("Node %s is not a Pacemaker node", this_node->details->uname);
1570 
1571     } else if (this_node->details->unclean) {
1572         pe_proc_warn("Node %s is unclean", this_node->details->uname);
1573 
1574     } else if (this_node->details->online) {
1575         crm_info("Node %s is %s", this_node->details->uname,
1576                  this_node->details->shutdown ? "shutting down" :
1577                  this_node->details->pending ? "pending" :
1578                  this_node->details->standby ? "standby" :
1579                  this_node->details->maintenance ? "maintenance" : "online");
1580 
1581     } else {
1582         crm_trace("Node %s is offline", this_node->details->uname);
1583     }
1584 }
1585 
1586 /*!
1587  * \internal
1588  * \brief Find the end of a resource's name, excluding any clone suffix
1589  *
1590  * \param[in] id  Resource ID to check
1591  *
1592  * \return Pointer to last character of resource's base name
1593  */
1594 const char *
1595 pe_base_name_end(const char *id)
     /* [previous][next][first][last][top][bottom][index][help] */
1596 {
1597     if (!pcmk__str_empty(id)) {
1598         const char *end = id + strlen(id) - 1;
1599 
1600         for (const char *s = end; s > id; --s) {
1601             switch (*s) {
1602                 case '0':
1603                 case '1':
1604                 case '2':
1605                 case '3':
1606                 case '4':
1607                 case '5':
1608                 case '6':
1609                 case '7':
1610                 case '8':
1611                 case '9':
1612                     break;
1613                 case ':':
1614                     return (s == end)? s : (s - 1);
1615                 default:
1616                     return end;
1617             }
1618         }
1619         return end;
1620     }
1621     return NULL;
1622 }
1623 
1624 /*!
1625  * \internal
1626  * \brief Get a resource name excluding any clone suffix
1627  *
1628  * \param[in] last_rsc_id  Resource ID to check
1629  *
1630  * \return Pointer to newly allocated string with resource's base name
1631  * \note It is the caller's responsibility to free() the result.
1632  *       This asserts on error, so callers can assume result is not NULL.
1633  */
1634 char *
1635 clone_strip(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1636 {
1637     const char *end = pe_base_name_end(last_rsc_id);
1638     char *basename = NULL;
1639 
1640     CRM_ASSERT(end);
1641     basename = strndup(last_rsc_id, end - last_rsc_id + 1);
1642     CRM_ASSERT(basename);
1643     return basename;
1644 }
1645 
1646 /*!
1647  * \internal
1648  * \brief Get the name of the first instance of a cloned resource
1649  *
1650  * \param[in] last_rsc_id  Resource ID to check
1651  *
1652  * \return Pointer to newly allocated string with resource's base name plus :0
1653  * \note It is the caller's responsibility to free() the result.
1654  *       This asserts on error, so callers can assume result is not NULL.
1655  */
1656 char *
1657 clone_zero(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1658 {
1659     const char *end = pe_base_name_end(last_rsc_id);
1660     size_t base_name_len = end - last_rsc_id + 1;
1661     char *zero = NULL;
1662 
1663     CRM_ASSERT(end);
1664     zero = calloc(base_name_len + 3, sizeof(char));
1665     CRM_ASSERT(zero);
1666     memcpy(zero, last_rsc_id, base_name_len);
1667     zero[base_name_len] = ':';
1668     zero[base_name_len + 1] = '0';
1669     return zero;
1670 }
1671 
1672 static pe_resource_t *
1673 create_fake_resource(const char *rsc_id, xmlNode * rsc_entry, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1674 {
1675     pe_resource_t *rsc = NULL;
1676     xmlNode *xml_rsc = create_xml_node(NULL, XML_CIB_TAG_RESOURCE);
1677 
1678     copy_in_properties(xml_rsc, rsc_entry);
1679     crm_xml_add(xml_rsc, XML_ATTR_ID, rsc_id);
1680     crm_log_xml_debug(xml_rsc, "Orphan resource");
1681 
1682     if (!common_unpack(xml_rsc, &rsc, NULL, data_set)) {
1683         return NULL;
1684     }
1685 
1686     if (xml_contains_remote_node(xml_rsc)) {
1687         pe_node_t *node;
1688 
1689         crm_debug("Detected orphaned remote node %s", rsc_id);
1690         node = pe_find_node(data_set->nodes, rsc_id);
1691         if (node == NULL) {
1692                 node = pe_create_node(rsc_id, rsc_id, "remote", NULL, data_set);
1693         }
1694         link_rsc2remotenode(data_set, rsc);
1695 
1696         if (node) {
1697             crm_trace("Setting node %s as shutting down due to orphaned connection resource", rsc_id);
1698             node->details->shutdown = TRUE;
1699         }
1700     }
1701 
1702     if (crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER)) {
1703         /* This orphaned rsc needs to be mapped to a container. */
1704         crm_trace("Detected orphaned container filler %s", rsc_id);
1705         pe__set_resource_flags(rsc, pe_rsc_orphan_container_filler);
1706     }
1707     pe__set_resource_flags(rsc, pe_rsc_orphan);
1708     data_set->resources = g_list_append(data_set->resources, rsc);
1709     return rsc;
1710 }
1711 
1712 /*!
1713  * \internal
1714  * \brief Create orphan instance for anonymous clone resource history
1715  */
1716 static pe_resource_t *
1717 create_anonymous_orphan(pe_resource_t *parent, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1718                         pe_node_t *node, pe_working_set_t *data_set)
1719 {
1720     pe_resource_t *top = pe__create_clone_child(parent, data_set);
1721 
1722     // find_rsc() because we might be a cloned group
1723     pe_resource_t *orphan = top->fns->find_rsc(top, rsc_id, NULL, pe_find_clone);
1724 
1725     pe_rsc_debug(parent, "Created orphan %s for %s: %s on %s",
1726                  top->id, parent->id, rsc_id, node->details->uname);
1727     return orphan;
1728 }
1729 
1730 /*!
1731  * \internal
1732  * \brief Check a node for an instance of an anonymous clone
1733  *
1734  * Return a child instance of the specified anonymous clone, in order of
1735  * preference: (1) the instance running on the specified node, if any;
1736  * (2) an inactive instance (i.e. within the total of clone-max instances);
1737  * (3) a newly created orphan (i.e. clone-max instances are already active).
1738  *
1739  * \param[in] data_set  Cluster information
1740  * \param[in] node      Node on which to check for instance
1741  * \param[in] parent    Clone to check
1742  * \param[in] rsc_id    Name of cloned resource in history (without instance)
1743  */
1744 static pe_resource_t *
1745 find_anonymous_clone(pe_working_set_t * data_set, pe_node_t * node, pe_resource_t * parent,
     /* [previous][next][first][last][top][bottom][index][help] */
1746                      const char *rsc_id)
1747 {
1748     GList *rIter = NULL;
1749     pe_resource_t *rsc = NULL;
1750     pe_resource_t *inactive_instance = NULL;
1751     gboolean skip_inactive = FALSE;
1752 
1753     CRM_ASSERT(parent != NULL);
1754     CRM_ASSERT(pe_rsc_is_clone(parent));
1755     CRM_ASSERT(!pcmk_is_set(parent->flags, pe_rsc_unique));
1756 
1757     // Check for active (or partially active, for cloned groups) instance
1758     pe_rsc_trace(parent, "Looking for %s on %s in %s", rsc_id, node->details->uname, parent->id);
1759     for (rIter = parent->children; rsc == NULL && rIter; rIter = rIter->next) {
1760         GList *locations = NULL;
1761         pe_resource_t *child = rIter->data;
1762 
1763         /* Check whether this instance is already known to be active or pending
1764          * anywhere, at this stage of unpacking. Because this function is called
1765          * for a resource before the resource's individual operation history
1766          * entries are unpacked, locations will generally not contain the
1767          * desired node.
1768          *
1769          * However, there are three exceptions:
1770          * (1) when child is a cloned group and we have already unpacked the
1771          *     history of another member of the group on the same node;
1772          * (2) when we've already unpacked the history of another numbered
1773          *     instance on the same node (which can happen if globally-unique
1774          *     was flipped from true to false); and
1775          * (3) when we re-run calculations on the same data set as part of a
1776          *     simulation.
1777          */
1778         child->fns->location(child, &locations, 2);
1779         if (locations) {
1780             /* We should never associate the same numbered anonymous clone
1781              * instance with multiple nodes, and clone instances can't migrate,
1782              * so there must be only one location, regardless of history.
1783              */
1784             CRM_LOG_ASSERT(locations->next == NULL);
1785 
1786             if (((pe_node_t *)locations->data)->details == node->details) {
1787                 /* This child instance is active on the requested node, so check
1788                  * for a corresponding configured resource. We use find_rsc()
1789                  * instead of child because child may be a cloned group, and we
1790                  * need the particular member corresponding to rsc_id.
1791                  *
1792                  * If the history entry is orphaned, rsc will be NULL.
1793                  */
1794                 rsc = parent->fns->find_rsc(child, rsc_id, NULL, pe_find_clone);
1795                 if (rsc) {
1796                     /* If there are multiple instance history entries for an
1797                      * anonymous clone in a single node's history (which can
1798                      * happen if globally-unique is switched from true to
1799                      * false), we want to consider the instances beyond the
1800                      * first as orphans, even if there are inactive instance
1801                      * numbers available.
1802                      */
1803                     if (rsc->running_on) {
1804                         crm_notice("Active (now-)anonymous clone %s has "
1805                                    "multiple (orphan) instance histories on %s",
1806                                    parent->id, node->details->uname);
1807                         skip_inactive = TRUE;
1808                         rsc = NULL;
1809                     } else {
1810                         pe_rsc_trace(parent, "Resource %s, active", rsc->id);
1811                     }
1812                 }
1813             }
1814             g_list_free(locations);
1815 
1816         } else {
1817             pe_rsc_trace(parent, "Resource %s, skip inactive", child->id);
1818             if (!skip_inactive && !inactive_instance
1819                 && !pcmk_is_set(child->flags, pe_rsc_block)) {
1820                 // Remember one inactive instance in case we don't find active
1821                 inactive_instance = parent->fns->find_rsc(child, rsc_id, NULL,
1822                                                           pe_find_clone);
1823 
1824                 /* ... but don't use it if it was already associated with a
1825                  * pending action on another node
1826                  */
1827                 if (inactive_instance && inactive_instance->pending_node
1828                     && (inactive_instance->pending_node->details != node->details)) {
1829                     inactive_instance = NULL;
1830                 }
1831             }
1832         }
1833     }
1834 
1835     if ((rsc == NULL) && !skip_inactive && (inactive_instance != NULL)) {
1836         pe_rsc_trace(parent, "Resource %s, empty slot", inactive_instance->id);
1837         rsc = inactive_instance;
1838     }
1839 
1840     /* If the resource has "requires" set to "quorum" or "nothing", and we don't
1841      * have a clone instance for every node, we don't want to consume a valid
1842      * instance number for unclean nodes. Such instances may appear to be active
1843      * according to the history, but should be considered inactive, so we can
1844      * start an instance elsewhere. Treat such instances as orphans.
1845      *
1846      * An exception is instances running on guest nodes -- since guest node
1847      * "fencing" is actually just a resource stop, requires shouldn't apply.
1848      *
1849      * @TODO Ideally, we'd use an inactive instance number if it is not needed
1850      * for any clean instances. However, we don't know that at this point.
1851      */
1852     if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_needs_fencing)
1853         && (!node->details->online || node->details->unclean)
1854         && !pe__is_guest_node(node)
1855         && !pe__is_universal_clone(parent, data_set)) {
1856 
1857         rsc = NULL;
1858     }
1859 
1860     if (rsc == NULL) {
1861         rsc = create_anonymous_orphan(parent, rsc_id, node, data_set);
1862         pe_rsc_trace(parent, "Resource %s, orphan", rsc->id);
1863     }
1864     return rsc;
1865 }
1866 
1867 static pe_resource_t *
1868 unpack_find_resource(pe_working_set_t * data_set, pe_node_t * node, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1869                      xmlNode * rsc_entry)
1870 {
1871     pe_resource_t *rsc = NULL;
1872     pe_resource_t *parent = NULL;
1873 
1874     crm_trace("looking for %s", rsc_id);
1875     rsc = pe_find_resource(data_set->resources, rsc_id);
1876 
1877     if (rsc == NULL) {
1878         /* If we didn't find the resource by its name in the operation history,
1879          * check it again as a clone instance. Even when clone-max=0, we create
1880          * a single :0 orphan to match against here.
1881          */
1882         char *clone0_id = clone_zero(rsc_id);
1883         pe_resource_t *clone0 = pe_find_resource(data_set->resources, clone0_id);
1884 
1885         if (clone0 && !pcmk_is_set(clone0->flags, pe_rsc_unique)) {
1886             rsc = clone0;
1887             parent = uber_parent(clone0);
1888             crm_trace("%s found as %s (%s)", rsc_id, clone0_id, parent->id);
1889         } else {
1890             crm_trace("%s is not known as %s either (orphan)",
1891                       rsc_id, clone0_id);
1892         }
1893         free(clone0_id);
1894 
1895     } else if (rsc->variant > pe_native) {
1896         crm_trace("Resource history for %s is orphaned because it is no longer primitive",
1897                   rsc_id);
1898         return NULL;
1899 
1900     } else {
1901         parent = uber_parent(rsc);
1902     }
1903 
1904     if (pe_rsc_is_anon_clone(parent)) {
1905 
1906         if (pe_rsc_is_bundled(parent)) {
1907             rsc = pe__find_bundle_replica(parent->parent, node);
1908         } else {
1909             char *base = clone_strip(rsc_id);
1910 
1911             rsc = find_anonymous_clone(data_set, node, parent, base);
1912             free(base);
1913             CRM_ASSERT(rsc != NULL);
1914         }
1915     }
1916 
1917     if (rsc && !pcmk__str_eq(rsc_id, rsc->id, pcmk__str_casei)
1918         && !pcmk__str_eq(rsc_id, rsc->clone_name, pcmk__str_casei)) {
1919 
1920         pcmk__str_update(&rsc->clone_name, rsc_id);
1921         pe_rsc_debug(rsc, "Internally renamed %s on %s to %s%s",
1922                      rsc_id, node->details->uname, rsc->id,
1923                      (pcmk_is_set(rsc->flags, pe_rsc_orphan)? " (ORPHAN)" : ""));
1924     }
1925     return rsc;
1926 }
1927 
1928 static pe_resource_t *
1929 process_orphan_resource(xmlNode * rsc_entry, pe_node_t * node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1930 {
1931     pe_resource_t *rsc = NULL;
1932     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
1933 
1934     crm_debug("Detected orphan resource %s on %s", rsc_id, node->details->uname);
1935     rsc = create_fake_resource(rsc_id, rsc_entry, data_set);
1936     if (rsc == NULL) {
1937         return NULL;
1938     }
1939 
1940     if (!pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)) {
1941         pe__clear_resource_flags(rsc, pe_rsc_managed);
1942 
1943     } else {
1944         CRM_CHECK(rsc != NULL, return NULL);
1945         pe_rsc_trace(rsc, "Added orphan %s", rsc->id);
1946         resource_location(rsc, NULL, -INFINITY, "__orphan_do_not_run__", data_set);
1947     }
1948     return rsc;
1949 }
1950 
1951 static void
1952 process_rsc_state(pe_resource_t * rsc, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
1953                   enum action_fail_response on_fail,
1954                   xmlNode * migrate_op, pe_working_set_t * data_set)
1955 {
1956     pe_node_t *tmpnode = NULL;
1957     char *reason = NULL;
1958     enum action_fail_response save_on_fail = action_fail_ignore;
1959 
1960     CRM_ASSERT(rsc);
1961     pe_rsc_trace(rsc, "Resource %s is %s on %s: on_fail=%s",
1962                  rsc->id, role2text(rsc->role), node->details->uname, fail2text(on_fail));
1963 
1964     /* process current state */
1965     if (rsc->role != RSC_ROLE_UNKNOWN) {
1966         pe_resource_t *iter = rsc;
1967 
1968         while (iter) {
1969             if (g_hash_table_lookup(iter->known_on, node->details->id) == NULL) {
1970                 pe_node_t *n = pe__copy_node(node);
1971 
1972                 pe_rsc_trace(rsc, "%s%s%s known on %s",
1973                              rsc->id,
1974                              ((rsc->clone_name == NULL)? "" : " also known as "),
1975                              ((rsc->clone_name == NULL)? "" : rsc->clone_name),
1976                              n->details->uname);
1977                 g_hash_table_insert(iter->known_on, (gpointer) n->details->id, n);
1978             }
1979             if (pcmk_is_set(iter->flags, pe_rsc_unique)) {
1980                 break;
1981             }
1982             iter = iter->parent;
1983         }
1984     }
1985 
1986     /* If a managed resource is believed to be running, but node is down ... */
1987     if (rsc->role > RSC_ROLE_STOPPED
1988         && node->details->online == FALSE
1989         && node->details->maintenance == FALSE
1990         && pcmk_is_set(rsc->flags, pe_rsc_managed)) {
1991 
1992         gboolean should_fence = FALSE;
1993 
1994         /* If this is a guest node, fence it (regardless of whether fencing is
1995          * enabled, because guest node fencing is done by recovery of the
1996          * container resource rather than by the fencer). Mark the resource
1997          * we're processing as failed. When the guest comes back up, its
1998          * operation history in the CIB will be cleared, freeing the affected
1999          * resource to run again once we are sure we know its state.
2000          */
2001         if (pe__is_guest_node(node)) {
2002             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2003             should_fence = TRUE;
2004 
2005         } else if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
2006             if (pe__is_remote_node(node) && node->details->remote_rsc
2007                 && !pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_failed)) {
2008 
2009                 /* Setting unseen means that fencing of the remote node will
2010                  * occur only if the connection resource is not going to start
2011                  * somewhere. This allows connection resources on a failed
2012                  * cluster node to move to another node without requiring the
2013                  * remote nodes to be fenced as well.
2014                  */
2015                 node->details->unseen = TRUE;
2016                 reason = crm_strdup_printf("%s is active there (fencing will be"
2017                                            " revoked if remote connection can "
2018                                            "be re-established elsewhere)",
2019                                            rsc->id);
2020             }
2021             should_fence = TRUE;
2022         }
2023 
2024         if (should_fence) {
2025             if (reason == NULL) {
2026                reason = crm_strdup_printf("%s is thought to be active there", rsc->id);
2027             }
2028             pe_fence_node(data_set, node, reason, FALSE);
2029         }
2030         free(reason);
2031     }
2032 
2033     /* In order to calculate priority_fencing_delay correctly, save the failure information and pass it to native_add_running(). */
2034     save_on_fail = on_fail;
2035 
2036     if (node->details->unclean) {
2037         /* No extra processing needed
2038          * Also allows resources to be started again after a node is shot
2039          */
2040         on_fail = action_fail_ignore;
2041     }
2042 
2043     switch (on_fail) {
2044         case action_fail_ignore:
2045             /* nothing to do */
2046             break;
2047 
2048         case action_fail_demote:
2049             pe__set_resource_flags(rsc, pe_rsc_failed);
2050             demote_action(rsc, node, FALSE);
2051             break;
2052 
2053         case action_fail_fence:
2054             /* treat it as if it is still running
2055              * but also mark the node as unclean
2056              */
2057             reason = crm_strdup_printf("%s failed there", rsc->id);
2058             pe_fence_node(data_set, node, reason, FALSE);
2059             free(reason);
2060             break;
2061 
2062         case action_fail_standby:
2063             node->details->standby = TRUE;
2064             node->details->standby_onfail = TRUE;
2065             break;
2066 
2067         case action_fail_block:
2068             /* is_managed == FALSE will prevent any
2069              * actions being sent for the resource
2070              */
2071             pe__clear_resource_flags(rsc, pe_rsc_managed);
2072             pe__set_resource_flags(rsc, pe_rsc_block);
2073             break;
2074 
2075         case action_fail_migrate:
2076             /* make sure it comes up somewhere else
2077              * or not at all
2078              */
2079             resource_location(rsc, node, -INFINITY, "__action_migration_auto__", data_set);
2080             break;
2081 
2082         case action_fail_stop:
2083             pe__set_next_role(rsc, RSC_ROLE_STOPPED, "on-fail=stop");
2084             break;
2085 
2086         case action_fail_recover:
2087             if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2088                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2089                 stop_action(rsc, node, FALSE);
2090             }
2091             break;
2092 
2093         case action_fail_restart_container:
2094             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2095             if (rsc->container && pe_rsc_is_bundled(rsc)) {
2096                 /* A bundle's remote connection can run on a different node than
2097                  * the bundle's container. We don't necessarily know where the
2098                  * container is running yet, so remember it and add a stop
2099                  * action for it later.
2100                  */
2101                 data_set->stop_needed = g_list_prepend(data_set->stop_needed,
2102                                                        rsc->container);
2103             } else if (rsc->container) {
2104                 stop_action(rsc->container, node, FALSE);
2105             } else if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2106                 stop_action(rsc, node, FALSE);
2107             }
2108             break;
2109 
2110         case action_fail_reset_remote:
2111             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2112             if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
2113                 tmpnode = NULL;
2114                 if (rsc->is_remote_node) {
2115                     tmpnode = pe_find_node(data_set->nodes, rsc->id);
2116                 }
2117                 if (tmpnode &&
2118                     pe__is_remote_node(tmpnode) &&
2119                     tmpnode->details->remote_was_fenced == 0) {
2120 
2121                     /* The remote connection resource failed in a way that
2122                      * should result in fencing the remote node.
2123                      */
2124                     pe_fence_node(data_set, tmpnode,
2125                                   "remote connection is unrecoverable", FALSE);
2126                 }
2127             }
2128 
2129             /* require the stop action regardless if fencing is occurring or not. */
2130             if (rsc->role > RSC_ROLE_STOPPED) {
2131                 stop_action(rsc, node, FALSE);
2132             }
2133 
2134             /* if reconnect delay is in use, prevent the connection from exiting the
2135              * "STOPPED" role until the failure is cleared by the delay timeout. */
2136             if (rsc->remote_reconnect_ms) {
2137                 pe__set_next_role(rsc, RSC_ROLE_STOPPED, "remote reset");
2138             }
2139             break;
2140     }
2141 
2142     /* ensure a remote-node connection failure forces an unclean remote-node
2143      * to be fenced. By setting unseen = FALSE, the remote-node failure will
2144      * result in a fencing operation regardless if we're going to attempt to 
2145      * reconnect to the remote-node in this transition or not. */
2146     if (pcmk_is_set(rsc->flags, pe_rsc_failed) && rsc->is_remote_node) {
2147         tmpnode = pe_find_node(data_set->nodes, rsc->id);
2148         if (tmpnode && tmpnode->details->unclean) {
2149             tmpnode->details->unseen = FALSE;
2150         }
2151     }
2152 
2153     if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2154         if (pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
2155             if (pcmk_is_set(rsc->flags, pe_rsc_managed)) {
2156                 pcmk__config_warn("Detected active orphan %s running on %s",
2157                                   rsc->id, node->details->uname);
2158             } else {
2159                 pcmk__config_warn("Resource '%s' must be stopped manually on "
2160                                   "%s because cluster is configured not to "
2161                                   "stop active orphans",
2162                                   rsc->id, node->details->uname);
2163             }
2164         }
2165 
2166         native_add_running(rsc, node, data_set, (save_on_fail != action_fail_ignore));
2167         switch (on_fail) {
2168             case action_fail_ignore:
2169                 break;
2170             case action_fail_demote:
2171             case action_fail_block:
2172                 pe__set_resource_flags(rsc, pe_rsc_failed);
2173                 break;
2174             default:
2175                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2176                 break;
2177         }
2178 
2179     } else if (rsc->clone_name && strchr(rsc->clone_name, ':') != NULL) {
2180         /* Only do this for older status sections that included instance numbers
2181          * Otherwise stopped instances will appear as orphans
2182          */
2183         pe_rsc_trace(rsc, "Resetting clone_name %s for %s (stopped)", rsc->clone_name, rsc->id);
2184         free(rsc->clone_name);
2185         rsc->clone_name = NULL;
2186 
2187     } else {
2188         GList *possible_matches = pe__resource_actions(rsc, node, RSC_STOP,
2189                                                        FALSE);
2190         GList *gIter = possible_matches;
2191 
2192         for (; gIter != NULL; gIter = gIter->next) {
2193             pe_action_t *stop = (pe_action_t *) gIter->data;
2194 
2195             pe__set_action_flags(stop, pe_action_optional);
2196         }
2197 
2198         g_list_free(possible_matches);
2199     }
2200 }
2201 
2202 /* create active recurring operations as optional */
2203 static void
2204 process_recurring(pe_node_t * node, pe_resource_t * rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
2205                   int start_index, int stop_index,
2206                   GList *sorted_op_list, pe_working_set_t * data_set)
2207 {
2208     int counter = -1;
2209     const char *task = NULL;
2210     const char *status = NULL;
2211     GList *gIter = sorted_op_list;
2212 
2213     CRM_ASSERT(rsc);
2214     pe_rsc_trace(rsc, "%s: Start index %d, stop index = %d", rsc->id, start_index, stop_index);
2215 
2216     for (; gIter != NULL; gIter = gIter->next) {
2217         xmlNode *rsc_op = (xmlNode *) gIter->data;
2218 
2219         guint interval_ms = 0;
2220         char *key = NULL;
2221         const char *id = ID(rsc_op);
2222 
2223         counter++;
2224 
2225         if (node->details->online == FALSE) {
2226             pe_rsc_trace(rsc, "Skipping %s/%s: node is offline", rsc->id, node->details->uname);
2227             break;
2228 
2229             /* Need to check if there's a monitor for role="Stopped" */
2230         } else if (start_index < stop_index && counter <= stop_index) {
2231             pe_rsc_trace(rsc, "Skipping %s/%s: resource is not active", id, node->details->uname);
2232             continue;
2233 
2234         } else if (counter < start_index) {
2235             pe_rsc_trace(rsc, "Skipping %s/%s: old %d", id, node->details->uname, counter);
2236             continue;
2237         }
2238 
2239         crm_element_value_ms(rsc_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
2240         if (interval_ms == 0) {
2241             pe_rsc_trace(rsc, "Skipping %s/%s: non-recurring", id, node->details->uname);
2242             continue;
2243         }
2244 
2245         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2246         if (pcmk__str_eq(status, "-1", pcmk__str_casei)) {
2247             pe_rsc_trace(rsc, "Skipping %s/%s: status", id, node->details->uname);
2248             continue;
2249         }
2250         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2251         /* create the action */
2252         key = pcmk__op_key(rsc->id, task, interval_ms);
2253         pe_rsc_trace(rsc, "Creating %s/%s", key, node->details->uname);
2254         custom_action(rsc, key, task, node, TRUE, TRUE, data_set);
2255     }
2256 }
2257 
2258 void
2259 calculate_active_ops(GList *sorted_op_list, int *start_index, int *stop_index)
     /* [previous][next][first][last][top][bottom][index][help] */
2260 {
2261     int counter = -1;
2262     int implied_monitor_start = -1;
2263     int implied_clone_start = -1;
2264     const char *task = NULL;
2265     const char *status = NULL;
2266     GList *gIter = sorted_op_list;
2267 
2268     *stop_index = -1;
2269     *start_index = -1;
2270 
2271     for (; gIter != NULL; gIter = gIter->next) {
2272         xmlNode *rsc_op = (xmlNode *) gIter->data;
2273 
2274         counter++;
2275 
2276         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2277         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2278 
2279         if (pcmk__str_eq(task, CRMD_ACTION_STOP, pcmk__str_casei)
2280             && pcmk__str_eq(status, "0", pcmk__str_casei)) {
2281             *stop_index = counter;
2282 
2283         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_START, CRMD_ACTION_MIGRATED, NULL)) {
2284             *start_index = counter;
2285 
2286         } else if ((implied_monitor_start <= *stop_index) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
2287             const char *rc = crm_element_value(rsc_op, XML_LRM_ATTR_RC);
2288 
2289             if (pcmk__strcase_any_of(rc, "0", "8", NULL)) {
2290                 implied_monitor_start = counter;
2291             }
2292         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_PROMOTE, CRMD_ACTION_DEMOTE, NULL)) {
2293             implied_clone_start = counter;
2294         }
2295     }
2296 
2297     if (*start_index == -1) {
2298         if (implied_clone_start != -1) {
2299             *start_index = implied_clone_start;
2300         } else if (implied_monitor_start != -1) {
2301             *start_index = implied_monitor_start;
2302         }
2303     }
2304 }
2305 
2306 // If resource history entry has shutdown lock, remember lock node and time
2307 static void
2308 unpack_shutdown_lock(xmlNode *rsc_entry, pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2309                      pe_working_set_t *data_set)
2310 {
2311     time_t lock_time = 0;   // When lock started (i.e. node shutdown time)
2312 
2313     if ((crm_element_value_epoch(rsc_entry, XML_CONFIG_ATTR_SHUTDOWN_LOCK,
2314                                  &lock_time) == pcmk_ok) && (lock_time != 0)) {
2315 
2316         if ((data_set->shutdown_lock > 0)
2317             && (get_effective_time(data_set)
2318                 > (lock_time + data_set->shutdown_lock))) {
2319             pe_rsc_info(rsc, "Shutdown lock for %s on %s expired",
2320                         rsc->id, node->details->uname);
2321             pe__clear_resource_history(rsc, node, data_set);
2322         } else {
2323             rsc->lock_node = node;
2324             rsc->lock_time = lock_time;
2325         }
2326     }
2327 }
2328 
2329 /*!
2330  * \internal
2331  * \brief Unpack one lrm_resource entry from a node's CIB status
2332  *
2333  * \param[in] node       Node whose status is being unpacked
2334  * \param[in] rsc_entry  lrm_resource XML being unpacked
2335  * \param[in] data_set   Cluster working set
2336  *
2337  * \return Resource corresponding to the entry, or NULL if no operation history
2338  */
2339 static pe_resource_t *
2340 unpack_lrm_resource(pe_node_t *node, xmlNode *lrm_resource,
     /* [previous][next][first][last][top][bottom][index][help] */
2341                     pe_working_set_t *data_set)
2342 {
2343     GList *gIter = NULL;
2344     int stop_index = -1;
2345     int start_index = -1;
2346     enum rsc_role_e req_role = RSC_ROLE_UNKNOWN;
2347 
2348     const char *task = NULL;
2349     const char *rsc_id = ID(lrm_resource);
2350 
2351     pe_resource_t *rsc = NULL;
2352     GList *op_list = NULL;
2353     GList *sorted_op_list = NULL;
2354 
2355     xmlNode *migrate_op = NULL;
2356     xmlNode *rsc_op = NULL;
2357     xmlNode *last_failure = NULL;
2358 
2359     enum action_fail_response on_fail = action_fail_ignore;
2360     enum rsc_role_e saved_role = RSC_ROLE_UNKNOWN;
2361 
2362     if (rsc_id == NULL) {
2363         crm_warn("Ignoring malformed " XML_LRM_TAG_RESOURCE
2364                  " entry without id");
2365         return NULL;
2366     }
2367     crm_trace("Unpacking " XML_LRM_TAG_RESOURCE " for %s on %s",
2368               rsc_id, node->details->uname);
2369 
2370     // Build a list of individual lrm_rsc_op entries, so we can sort them
2371     for (rsc_op = first_named_child(lrm_resource, XML_LRM_TAG_RSC_OP);
2372          rsc_op != NULL; rsc_op = crm_next_same_xml(rsc_op)) {
2373 
2374         op_list = g_list_prepend(op_list, rsc_op);
2375     }
2376 
2377     if (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2378         if (op_list == NULL) {
2379             // If there are no operations, there is nothing to do
2380             return NULL;
2381         }
2382     }
2383 
2384     /* find the resource */
2385     rsc = unpack_find_resource(data_set, node, rsc_id, lrm_resource);
2386     if (rsc == NULL) {
2387         if (op_list == NULL) {
2388             // If there are no operations, there is nothing to do
2389             return NULL;
2390         } else {
2391             rsc = process_orphan_resource(lrm_resource, node, data_set);
2392         }
2393     }
2394     CRM_ASSERT(rsc != NULL);
2395 
2396     // Check whether the resource is "shutdown-locked" to this node
2397     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2398         unpack_shutdown_lock(lrm_resource, rsc, node, data_set);
2399     }
2400 
2401     /* process operations */
2402     saved_role = rsc->role;
2403     rsc->role = RSC_ROLE_UNKNOWN;
2404     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
2405 
2406     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
2407         xmlNode *rsc_op = (xmlNode *) gIter->data;
2408 
2409         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2410         if (pcmk__str_eq(task, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
2411             migrate_op = rsc_op;
2412         }
2413 
2414         unpack_rsc_op(rsc, node, rsc_op, &last_failure, &on_fail, data_set);
2415     }
2416 
2417     /* create active recurring operations as optional */
2418     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
2419     process_recurring(node, rsc, start_index, stop_index, sorted_op_list, data_set);
2420 
2421     /* no need to free the contents */
2422     g_list_free(sorted_op_list);
2423 
2424     process_rsc_state(rsc, node, on_fail, migrate_op, data_set);
2425 
2426     if (get_target_role(rsc, &req_role)) {
2427         if (rsc->next_role == RSC_ROLE_UNKNOWN || req_role < rsc->next_role) {
2428             pe__set_next_role(rsc, req_role, XML_RSC_ATTR_TARGET_ROLE);
2429 
2430         } else if (req_role > rsc->next_role) {
2431             pe_rsc_info(rsc, "%s: Not overwriting calculated next role %s"
2432                         " with requested next role %s",
2433                         rsc->id, role2text(rsc->next_role), role2text(req_role));
2434         }
2435     }
2436 
2437     if (saved_role > rsc->role) {
2438         rsc->role = saved_role;
2439     }
2440 
2441     return rsc;
2442 }
2443 
2444 static void
2445 handle_orphaned_container_fillers(xmlNode * lrm_rsc_list, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2446 {
2447     xmlNode *rsc_entry = NULL;
2448     for (rsc_entry = pcmk__xe_first_child(lrm_rsc_list); rsc_entry != NULL;
2449          rsc_entry = pcmk__xe_next(rsc_entry)) {
2450 
2451         pe_resource_t *rsc;
2452         pe_resource_t *container;
2453         const char *rsc_id;
2454         const char *container_id;
2455 
2456         if (!pcmk__str_eq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE, pcmk__str_casei)) {
2457             continue;
2458         }
2459 
2460         container_id = crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER);
2461         rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
2462         if (container_id == NULL || rsc_id == NULL) {
2463             continue;
2464         }
2465 
2466         container = pe_find_resource(data_set->resources, container_id);
2467         if (container == NULL) {
2468             continue;
2469         }
2470 
2471         rsc = pe_find_resource(data_set->resources, rsc_id);
2472         if (rsc == NULL ||
2473             !pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler) ||
2474             rsc->container != NULL) {
2475             continue;
2476         }
2477 
2478         pe_rsc_trace(rsc, "Mapped container of orphaned resource %s to %s",
2479                      rsc->id, container_id);
2480         rsc->container = container;
2481         container->fillers = g_list_append(container->fillers, rsc);
2482     }
2483 }
2484 
2485 /*!
2486  * \internal
2487  * \brief Unpack one node's lrm status section
2488  *
2489  * \param[in] node      Node whose status is being unpacked
2490  * \param[in] xml       CIB node state XML
2491  * \param[in] data_set  Cluster working set
2492  */
2493 static void
2494 unpack_node_lrm(pe_node_t *node, xmlNode *xml, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2495 {
2496     bool found_orphaned_container_filler = false;
2497 
2498     // Drill down to lrm_resources section
2499     xml = find_xml_node(xml, XML_CIB_TAG_LRM, FALSE);
2500     if (xml == NULL) {
2501         return;
2502     }
2503     xml = find_xml_node(xml, XML_LRM_TAG_RESOURCES, FALSE);
2504     if (xml == NULL) {
2505         return;
2506     }
2507 
2508     // Unpack each lrm_resource entry
2509     for (xmlNode *rsc_entry = first_named_child(xml, XML_LRM_TAG_RESOURCE);
2510          rsc_entry != NULL; rsc_entry = crm_next_same_xml(rsc_entry)) {
2511 
2512         pe_resource_t *rsc = unpack_lrm_resource(node, rsc_entry, data_set);
2513 
2514         if ((rsc != NULL)
2515             && pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler)) {
2516             found_orphaned_container_filler = true;
2517         }
2518     }
2519 
2520     /* Now that all resource state has been unpacked for this node, map any
2521      * orphaned container fillers to their container resource.
2522      */
2523     if (found_orphaned_container_filler) {
2524         handle_orphaned_container_fillers(xml, data_set);
2525     }
2526 }
2527 
2528 static void
2529 set_active(pe_resource_t * rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
2530 {
2531     pe_resource_t *top = uber_parent(rsc);
2532 
2533     if (top && pcmk_is_set(top->flags, pe_rsc_promotable)) {
2534         rsc->role = RSC_ROLE_UNPROMOTED;
2535     } else {
2536         rsc->role = RSC_ROLE_STARTED;
2537     }
2538 }
2539 
2540 static void
2541 set_node_score(gpointer key, gpointer value, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
2542 {
2543     pe_node_t *node = value;
2544     int *score = user_data;
2545 
2546     node->weight = *score;
2547 }
2548 
2549 #define STATUS_PATH_MAX 1024
2550 static xmlNode *
2551 find_lrm_op(const char *resource, const char *op, const char *node, const char *source,
     /* [previous][next][first][last][top][bottom][index][help] */
2552             bool success_only, pe_working_set_t *data_set)
2553 {
2554     int offset = 0;
2555     char xpath[STATUS_PATH_MAX];
2556     xmlNode *xml = NULL;
2557 
2558     offset += snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//node_state[@uname='%s']", node);
2559     offset +=
2560         snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//" XML_LRM_TAG_RESOURCE "[@id='%s']",
2561                  resource);
2562 
2563     /* Need to check against transition_magic too? */
2564     if (source && pcmk__str_eq(op, CRMD_ACTION_MIGRATE, pcmk__str_casei)) {
2565         offset +=
2566             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2567                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_target='%s']", op,
2568                      source);
2569     } else if (source && pcmk__str_eq(op, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
2570         offset +=
2571             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2572                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_source='%s']", op,
2573                      source);
2574     } else {
2575         offset +=
2576             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2577                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s']", op);
2578     }
2579 
2580     CRM_LOG_ASSERT(offset > 0);
2581     xml = get_xpath_object(xpath, data_set->input, LOG_DEBUG);
2582 
2583     if (xml && success_only) {
2584         int rc = PCMK_OCF_UNKNOWN_ERROR;
2585         int status = PCMK_EXEC_ERROR;
2586 
2587         crm_element_value_int(xml, XML_LRM_ATTR_RC, &rc);
2588         crm_element_value_int(xml, XML_LRM_ATTR_OPSTATUS, &status);
2589         if ((rc != PCMK_OCF_OK) || (status != PCMK_EXEC_DONE)) {
2590             return NULL;
2591         }
2592     }
2593     return xml;
2594 }
2595 
2596 static int
2597 pe__call_id(xmlNode *op_xml)
     /* [previous][next][first][last][top][bottom][index][help] */
2598 {
2599     int id = 0;
2600 
2601     if (op_xml) {
2602         crm_element_value_int(op_xml, XML_LRM_ATTR_CALLID, &id);
2603     }
2604     return id;
2605 }
2606 
2607 /*!
2608  * \brief Check whether a stop happened on the same node after some event
2609  *
2610  * \param[in] rsc       Resource being checked
2611  * \param[in] node      Node being checked
2612  * \param[in] xml_op    Event that stop is being compared to
2613  * \param[in] data_set  Cluster working set
2614  *
2615  * \return TRUE if stop happened after event, FALSE otherwise
2616  *
2617  * \note This is really unnecessary, but kept as a safety mechanism. We
2618  *       currently don't save more than one successful event in history, so this
2619  *       only matters when processing really old CIB files that we don't
2620  *       technically support anymore, or as preparation for logging an extended
2621  *       history in the future.
2622  */
2623 static bool
2624 stop_happened_after(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2625                     pe_working_set_t *data_set)
2626 {
2627     xmlNode *stop_op = find_lrm_op(rsc->id, CRMD_ACTION_STOP,
2628                                    node->details->uname, NULL, TRUE, data_set);
2629 
2630     return (stop_op && (pe__call_id(stop_op) > pe__call_id(xml_op)));
2631 }
2632 
2633 static void
2634 unpack_migrate_to_success(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2635                           pe_working_set_t *data_set)
2636 {
2637     /* A successful migration sequence is:
2638      *    migrate_to on source node
2639      *    migrate_from on target node
2640      *    stop on source node
2641      *
2642      * If a migrate_to is followed by a stop, the entire migration (successful
2643      * or failed) is complete, and we don't care what happened on the target.
2644      *
2645      * If no migrate_from has happened, the migration is considered to be
2646      * "partial". If the migrate_from failed, make sure the resource gets
2647      * stopped on both source and target (if up).
2648      *
2649      * If the migrate_to and migrate_from both succeeded (which also implies the
2650      * resource is no longer running on the source), but there is no stop, the
2651      * migration is considered to be "dangling". Schedule a stop on the source
2652      * in this case.
2653      */
2654     int from_rc = 0;
2655     int from_status = 0;
2656     pe_node_t *target_node = NULL;
2657     pe_node_t *source_node = NULL;
2658     xmlNode *migrate_from = NULL;
2659     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2660     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2661 
2662     // Sanity check
2663     CRM_CHECK(source && target && !strcmp(source, node->details->uname), return);
2664 
2665     if (stop_happened_after(rsc, node, xml_op, data_set)) {
2666         return;
2667     }
2668 
2669     // Clones are not allowed to migrate, so role can't be promoted
2670     rsc->role = RSC_ROLE_STARTED;
2671 
2672     target_node = pe_find_node(data_set->nodes, target);
2673     source_node = pe_find_node(data_set->nodes, source);
2674 
2675     // Check whether there was a migrate_from action on the target
2676     migrate_from = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, target,
2677                                source, FALSE, data_set);
2678     if (migrate_from) {
2679         crm_element_value_int(migrate_from, XML_LRM_ATTR_RC, &from_rc);
2680         crm_element_value_int(migrate_from, XML_LRM_ATTR_OPSTATUS, &from_status);
2681         pe_rsc_trace(rsc, "%s op on %s exited with status=%d, rc=%d",
2682                      ID(migrate_from), target, from_status, from_rc);
2683     }
2684 
2685     if (migrate_from && from_rc == PCMK_OCF_OK
2686         && (from_status == PCMK_EXEC_DONE)) {
2687         /* The migrate_to and migrate_from both succeeded, so mark the migration
2688          * as "dangling". This will be used to schedule a stop action on the
2689          * source without affecting the target.
2690          */
2691         pe_rsc_trace(rsc, "Detected dangling migration op: %s on %s", ID(xml_op),
2692                      source);
2693         rsc->role = RSC_ROLE_STOPPED;
2694         rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
2695 
2696     } else if (migrate_from && (from_status != PCMK_EXEC_PENDING)) { // Failed
2697         if (target_node && target_node->details->online) {
2698             pe_rsc_trace(rsc, "Marking active on %s %p %d", target, target_node,
2699                          target_node->details->online);
2700             native_add_running(rsc, target_node, data_set, TRUE);
2701         }
2702 
2703     } else { // Pending, or complete but erased
2704         if (target_node && target_node->details->online) {
2705             pe_rsc_trace(rsc, "Marking active on %s %p %d", target, target_node,
2706                          target_node->details->online);
2707 
2708             native_add_running(rsc, target_node, data_set, FALSE);
2709             if (source_node && source_node->details->online) {
2710                 /* This is a partial migration: the migrate_to completed
2711                  * successfully on the source, but the migrate_from has not
2712                  * completed. Remember the source and target; if the newly
2713                  * chosen target remains the same when we schedule actions
2714                  * later, we may continue with the migration.
2715                  */
2716                 rsc->partial_migration_target = target_node;
2717                 rsc->partial_migration_source = source_node;
2718             }
2719         } else {
2720             /* Consider it failed here - forces a restart, prevents migration */
2721             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2722             pe__clear_resource_flags(rsc, pe_rsc_allow_migrate);
2723         }
2724     }
2725 }
2726 
2727 // Is there an action_name in node_name's rsc history newer than call_id?
2728 static bool
2729 newer_op(pe_resource_t *rsc, const char *action_name, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2730          int call_id, pe_working_set_t *data_set)
2731 {
2732     xmlNode *action = find_lrm_op(rsc->id, action_name, node_name, NULL, TRUE,
2733                                   data_set);
2734 
2735     return pe__call_id(action) > call_id;
2736 }
2737 
2738 static void
2739 unpack_migrate_to_failure(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2740                           pe_working_set_t *data_set)
2741 {
2742     int target_stop_id = 0;
2743     int target_migrate_from_id = 0;
2744     xmlNode *target_stop = NULL;
2745     xmlNode *target_migrate_from = NULL;
2746     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2747     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2748 
2749     // Sanity check
2750     CRM_CHECK(source && target && !strcmp(source, node->details->uname), return);
2751 
2752     /* If a migration failed, we have to assume the resource is active. Clones
2753      * are not allowed to migrate, so role can't be promoted.
2754      */
2755     rsc->role = RSC_ROLE_STARTED;
2756 
2757     // Check for stop on the target
2758     target_stop = find_lrm_op(rsc->id, CRMD_ACTION_STOP, target, NULL,
2759                               TRUE, data_set);
2760     target_stop_id = pe__call_id(target_stop);
2761 
2762     // Check for migrate_from on the target
2763     target_migrate_from = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, target,
2764                                       source, TRUE, data_set);
2765     target_migrate_from_id = pe__call_id(target_migrate_from);
2766 
2767     if ((target_stop == NULL) || (target_stop_id < target_migrate_from_id)) {
2768         /* There was no stop on the target, or a stop that happened before a
2769          * migrate_from, so assume the resource is still active on the target
2770          * (if it is up).
2771          */
2772         pe_node_t *target_node = pe_find_node(data_set->nodes, target);
2773 
2774         pe_rsc_trace(rsc, "stop (%d) + migrate_from (%d)",
2775                      target_stop_id, target_migrate_from_id);
2776         if (target_node && target_node->details->online) {
2777             native_add_running(rsc, target_node, data_set, FALSE);
2778         }
2779 
2780     } else if (target_migrate_from == NULL) {
2781         /* We know there was a stop on the target, but there may not have been a
2782          * migrate_from (the stop could have happened before migrate_from was
2783          * scheduled or attempted).
2784          *
2785          * That means this could be a "dangling" migration. But first, check
2786          * whether there is a newer successful stop, start, or migrate_from on
2787          * the source node -- it's possible the failed migration was followed by
2788          * a successful stop, full restart, or migration in the reverse
2789          * direction, in which case we don't want to force a stop.
2790          */
2791         int source_migrate_to_id = pe__call_id(xml_op);
2792 
2793         if (newer_op(rsc, CRMD_ACTION_MIGRATED, source, source_migrate_to_id,
2794                      data_set)
2795             || newer_op(rsc, CRMD_ACTION_START, source, source_migrate_to_id,
2796                      data_set)
2797             || newer_op(rsc, CRMD_ACTION_STOP, source, source_migrate_to_id,
2798                      data_set)) {
2799             return;
2800         }
2801 
2802         // Mark node as having dangling migration so we can force a stop later
2803         rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
2804     }
2805 }
2806 
2807 static void
2808 unpack_migrate_from_failure(pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2809                             xmlNode *xml_op, pe_working_set_t *data_set)
2810 {
2811     xmlNode *source_stop = NULL;
2812     xmlNode *source_migrate_to = NULL;
2813     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2814     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2815 
2816     // Sanity check
2817     CRM_CHECK(source && target && !strcmp(target, node->details->uname), return);
2818 
2819     /* If a migration failed, we have to assume the resource is active. Clones
2820      * are not allowed to migrate, so role can't be promoted.
2821      */
2822     rsc->role = RSC_ROLE_STARTED;
2823 
2824     // Check for a stop on the source
2825     source_stop = find_lrm_op(rsc->id, CRMD_ACTION_STOP, source, NULL,
2826                               TRUE, data_set);
2827 
2828     // Check for a migrate_to on the source
2829     source_migrate_to = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATE,
2830                                     source, target, TRUE, data_set);
2831 
2832     if ((source_stop == NULL)
2833         || (pe__call_id(source_stop) < pe__call_id(source_migrate_to))) {
2834         /* There was no stop on the source, or a stop that happened before
2835          * migrate_to, so assume the resource is still active on the source (if
2836          * it is up).
2837          */
2838         pe_node_t *source_node = pe_find_node(data_set->nodes, source);
2839 
2840         if (source_node && source_node->details->online) {
2841             native_add_running(rsc, source_node, data_set, TRUE);
2842         }
2843     }
2844 }
2845 
2846 static void
2847 record_failed_op(xmlNode *op, const pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2848                  const pe_resource_t *rsc, pe_working_set_t *data_set)
2849 {
2850     xmlNode *xIter = NULL;
2851     const char *op_key = crm_element_value(op, XML_LRM_ATTR_TASK_KEY);
2852 
2853     if (node->details->online == FALSE) {
2854         return;
2855     }
2856 
2857     for (xIter = data_set->failed->children; xIter; xIter = xIter->next) {
2858         const char *key = crm_element_value(xIter, XML_LRM_ATTR_TASK_KEY);
2859         const char *uname = crm_element_value(xIter, XML_ATTR_UNAME);
2860 
2861         if(pcmk__str_eq(op_key, key, pcmk__str_casei) && pcmk__str_eq(uname, node->details->uname, pcmk__str_casei)) {
2862             crm_trace("Skipping duplicate entry %s on %s", op_key, node->details->uname);
2863             return;
2864         }
2865     }
2866 
2867     crm_trace("Adding entry %s on %s", op_key, node->details->uname);
2868     crm_xml_add(op, XML_ATTR_UNAME, node->details->uname);
2869     crm_xml_add(op, XML_LRM_ATTR_RSCID, rsc->id);
2870     add_node_copy(data_set->failed, op);
2871 }
2872 
2873 static const char *get_op_key(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
2874 {
2875     const char *key = crm_element_value(xml_op, XML_LRM_ATTR_TASK_KEY);
2876     if(key == NULL) {
2877         key = ID(xml_op);
2878     }
2879     return key;
2880 }
2881 
2882 static const char *
2883 last_change_str(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
2884 {
2885     time_t when;
2886     const char *when_s = NULL;
2887 
2888     if (crm_element_value_epoch(xml_op, XML_RSC_OP_LAST_CHANGE,
2889                                 &when) == pcmk_ok) {
2890         when_s = pcmk__epoch2str(&when);
2891         if (when_s) {
2892             // Skip day of week to make message shorter
2893             when_s = strchr(when_s, ' ');
2894             if (when_s) {
2895                 ++when_s;
2896             }
2897         }
2898     }
2899     return ((when_s && *when_s)? when_s : "unknown time");
2900 }
2901 
2902 /*!
2903  * \internal
2904  * \brief Compare two on-fail values
2905  *
2906  * \param[in] first   One on-fail value to compare
2907  * \param[in] second  The other on-fail value to compare
2908  *
2909  * \return A negative number if second is more severe than first, zero if they
2910  *         are equal, or a positive number if first is more severe than second.
2911  * \note This is only needed until the action_fail_response values can be
2912  *       renumbered at the next API compatibility break.
2913  */
2914 static int
2915 cmp_on_fail(enum action_fail_response first, enum action_fail_response second)
     /* [previous][next][first][last][top][bottom][index][help] */
2916 {
2917     switch (first) {
2918         case action_fail_demote:
2919             switch (second) {
2920                 case action_fail_ignore:
2921                     return 1;
2922                 case action_fail_demote:
2923                     return 0;
2924                 default:
2925                     return -1;
2926             }
2927             break;
2928 
2929         case action_fail_reset_remote:
2930             switch (second) {
2931                 case action_fail_ignore:
2932                 case action_fail_demote:
2933                 case action_fail_recover:
2934                     return 1;
2935                 case action_fail_reset_remote:
2936                     return 0;
2937                 default:
2938                     return -1;
2939             }
2940             break;
2941 
2942         case action_fail_restart_container:
2943             switch (second) {
2944                 case action_fail_ignore:
2945                 case action_fail_demote:
2946                 case action_fail_recover:
2947                 case action_fail_reset_remote:
2948                     return 1;
2949                 case action_fail_restart_container:
2950                     return 0;
2951                 default:
2952                     return -1;
2953             }
2954             break;
2955 
2956         default:
2957             break;
2958     }
2959     switch (second) {
2960         case action_fail_demote:
2961             return (first == action_fail_ignore)? -1 : 1;
2962 
2963         case action_fail_reset_remote:
2964             switch (first) {
2965                 case action_fail_ignore:
2966                 case action_fail_demote:
2967                 case action_fail_recover:
2968                     return -1;
2969                 default:
2970                     return 1;
2971             }
2972             break;
2973 
2974         case action_fail_restart_container:
2975             switch (first) {
2976                 case action_fail_ignore:
2977                 case action_fail_demote:
2978                 case action_fail_recover:
2979                 case action_fail_reset_remote:
2980                     return -1;
2981                 default:
2982                     return 1;
2983             }
2984             break;
2985 
2986         default:
2987             break;
2988     }
2989     return first - second;
2990 }
2991 
2992 static void
2993 unpack_rsc_op_failure(pe_resource_t * rsc, pe_node_t * node, int rc, xmlNode * xml_op, xmlNode ** last_failure,
     /* [previous][next][first][last][top][bottom][index][help] */
2994                       enum action_fail_response * on_fail, pe_working_set_t * data_set)
2995 {
2996     bool is_probe = false;
2997     pe_action_t *action = NULL;
2998 
2999     const char *key = get_op_key(xml_op);
3000     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3001     const char *exit_reason = crm_element_value(xml_op,
3002                                                 XML_LRM_ATTR_EXIT_REASON);
3003 
3004     CRM_ASSERT(rsc);
3005     CRM_CHECK(task != NULL, return);
3006 
3007     *last_failure = xml_op;
3008 
3009     is_probe = pcmk_xe_is_probe(xml_op);
3010 
3011     if (exit_reason == NULL) {
3012         exit_reason = "";
3013     }
3014 
3015     if (!pcmk_is_set(data_set->flags, pe_flag_symmetric_cluster)
3016         && (rc == PCMK_OCF_NOT_INSTALLED)) {
3017         crm_trace("Unexpected result (%s%s%s) was recorded for "
3018                   "%s of %s on %s at %s " CRM_XS " rc=%d id=%s",
3019                   services_ocf_exitcode_str(rc),
3020                   (*exit_reason? ": " : ""), exit_reason,
3021                   (is_probe? "probe" : task), rsc->id, node->details->uname,
3022                   last_change_str(xml_op), rc, ID(xml_op));
3023     } else {
3024         crm_warn("Unexpected result (%s%s%s) was recorded for "
3025                   "%s of %s on %s at %s " CRM_XS " rc=%d id=%s",
3026                  services_ocf_exitcode_str(rc),
3027                  (*exit_reason? ": " : ""), exit_reason,
3028                  (is_probe? "probe" : task), rsc->id, node->details->uname,
3029                  last_change_str(xml_op), rc, ID(xml_op));
3030 
3031         if (is_probe && (rc != PCMK_OCF_OK)
3032             && (rc != PCMK_OCF_NOT_RUNNING)
3033             && (rc != PCMK_OCF_RUNNING_PROMOTED)) {
3034 
3035             /* A failed (not just unexpected) probe result could mean the user
3036              * didn't know resources will be probed even where they can't run.
3037              */
3038             crm_notice("If it is not possible for %s to run on %s, see "
3039                        "the resource-discovery option for location constraints",
3040                        rsc->id, node->details->uname);
3041         }
3042 
3043         record_failed_op(xml_op, node, rsc, data_set);
3044     }
3045 
3046     action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
3047     if (cmp_on_fail(*on_fail, action->on_fail) < 0) {
3048         pe_rsc_trace(rsc, "on-fail %s -> %s for %s (%s)", fail2text(*on_fail),
3049                      fail2text(action->on_fail), action->uuid, key);
3050         *on_fail = action->on_fail;
3051     }
3052 
3053     if (!strcmp(task, CRMD_ACTION_STOP)) {
3054         resource_location(rsc, node, -INFINITY, "__stop_fail__", data_set);
3055 
3056     } else if (!strcmp(task, CRMD_ACTION_MIGRATE)) {
3057         unpack_migrate_to_failure(rsc, node, xml_op, data_set);
3058 
3059     } else if (!strcmp(task, CRMD_ACTION_MIGRATED)) {
3060         unpack_migrate_from_failure(rsc, node, xml_op, data_set);
3061 
3062     } else if (!strcmp(task, CRMD_ACTION_PROMOTE)) {
3063         rsc->role = RSC_ROLE_PROMOTED;
3064 
3065     } else if (!strcmp(task, CRMD_ACTION_DEMOTE)) {
3066         if (action->on_fail == action_fail_block) {
3067             rsc->role = RSC_ROLE_PROMOTED;
3068             pe__set_next_role(rsc, RSC_ROLE_STOPPED,
3069                               "demote with on-fail=block");
3070 
3071         } else if(rc == PCMK_OCF_NOT_RUNNING) {
3072             rsc->role = RSC_ROLE_STOPPED;
3073 
3074         } else {
3075             /* Staying in the promoted role would put the scheduler and
3076              * controller into a loop. Setting the role to unpromoted is not
3077              * dangerous because the resource will be stopped as part of
3078              * recovery, and any promotion will be ordered after that stop.
3079              */
3080             rsc->role = RSC_ROLE_UNPROMOTED;
3081         }
3082     }
3083 
3084     if(is_probe && rc == PCMK_OCF_NOT_INSTALLED) {
3085         /* leave stopped */
3086         pe_rsc_trace(rsc, "Leaving %s stopped", rsc->id);
3087         rsc->role = RSC_ROLE_STOPPED;
3088 
3089     } else if (rsc->role < RSC_ROLE_STARTED) {
3090         pe_rsc_trace(rsc, "Setting %s active", rsc->id);
3091         set_active(rsc);
3092     }
3093 
3094     pe_rsc_trace(rsc, "Resource %s: role=%s, unclean=%s, on_fail=%s, fail_role=%s",
3095                  rsc->id, role2text(rsc->role),
3096                  pcmk__btoa(node->details->unclean),
3097                  fail2text(action->on_fail), role2text(action->fail_role));
3098 
3099     if (action->fail_role != RSC_ROLE_STARTED && rsc->next_role < action->fail_role) {
3100         pe__set_next_role(rsc, action->fail_role, "failure");
3101     }
3102 
3103     if (action->fail_role == RSC_ROLE_STOPPED) {
3104         int score = -INFINITY;
3105 
3106         pe_resource_t *fail_rsc = rsc;
3107 
3108         if (fail_rsc->parent) {
3109             pe_resource_t *parent = uber_parent(fail_rsc);
3110 
3111             if (pe_rsc_is_clone(parent)
3112                 && !pcmk_is_set(parent->flags, pe_rsc_unique)) {
3113                 /* For clone resources, if a child fails on an operation
3114                  * with on-fail = stop, all the resources fail.  Do this by preventing
3115                  * the parent from coming up again. */
3116                 fail_rsc = parent;
3117             }
3118         }
3119         crm_notice("%s will not be started under current conditions",
3120                    fail_rsc->id);
3121         /* make sure it doesn't come up again */
3122         if (fail_rsc->allowed_nodes != NULL) {
3123             g_hash_table_destroy(fail_rsc->allowed_nodes);
3124         }
3125         fail_rsc->allowed_nodes = pe__node_list2table(data_set->nodes);
3126         g_hash_table_foreach(fail_rsc->allowed_nodes, set_node_score, &score);
3127     }
3128 
3129     pe_free_action(action);
3130 }
3131 
3132 /*!
3133  * \internal
3134  * \brief Remap informational monitor results and operation status
3135  *
3136  * For the monitor results, certain OCF codes are for providing extended information
3137  * to the user about services that aren't yet failed but not entirely healthy either.
3138  * These must be treated as the "normal" result by Pacemaker.
3139  *
3140  * For operation status, the action result can be used to determine an appropriate
3141  * status for the purposes of responding to the action.  The status provided by the
3142  * executor is not directly usable since the executor does not know what was expected.
3143  *
3144  * \param[in]     xml_op     Operation history entry XML from CIB status
3145  * \param[in,out] rsc        Resource that operation history entry is for
3146  * \param[in]     node       Node where operation was executed
3147  * \param[in]     data_set   Current cluster working set
3148  * \param[in,out] on_fail    What should be done about the result
3149  * \param[in]     target_rc  Expected return code of operation
3150  * \param[in,out] rc         Actual return code of operation
3151  * \param[in,out] status     Operation execution status
3152  *
3153  * \note If the result is remapped and the node is not shutting down or failed,
3154  *       the operation will be recorded in the data set's list of failed operations
3155  *       to highlight it for the user.
3156  *
3157  * \note This may update the resource's current and next role.
3158  */
3159 static void
3160 remap_operation(xmlNode *xml_op, pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
3161                 pe_working_set_t *data_set, enum action_fail_response *on_fail,
3162                 int target_rc, int *rc, int *status) {
3163     bool is_probe = false;
3164     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3165     const char *key = get_op_key(xml_op);
3166     const char *exit_reason = crm_element_value(xml_op,
3167                                                 XML_LRM_ATTR_EXIT_REASON);
3168 
3169     if (pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_none)) {
3170         int remapped_rc = pcmk__effective_rc(*rc);
3171 
3172         if (*rc != remapped_rc) {
3173             crm_trace("Remapping monitor result %d to %d", *rc, remapped_rc);
3174             if (!node->details->shutdown || node->details->online) {
3175                 record_failed_op(xml_op, node, rsc, data_set);
3176             }
3177 
3178             *rc = remapped_rc;
3179         }
3180     }
3181 
3182     if (!pe_rsc_is_bundled(rsc) && pcmk_xe_mask_probe_failure(xml_op)) {
3183         *status = PCMK_EXEC_DONE;
3184         *rc = PCMK_OCF_NOT_RUNNING;
3185     }
3186 
3187     /* If the executor reported an operation status of anything but done or
3188      * error, consider that final. But for done or error, we know better whether
3189      * it should be treated as a failure or not, because we know the expected
3190      * result.
3191      */
3192     if (*status != PCMK_EXEC_DONE && *status != PCMK_EXEC_ERROR) {
3193         return;
3194     }
3195 
3196     CRM_ASSERT(rsc);
3197     CRM_CHECK(task != NULL,
3198               *status = PCMK_EXEC_ERROR; return);
3199 
3200     *status = PCMK_EXEC_DONE;
3201 
3202     if (exit_reason == NULL) {
3203         exit_reason = "";
3204     }
3205 
3206     is_probe = pcmk_xe_is_probe(xml_op);
3207 
3208     if (is_probe) {
3209         task = "probe";
3210     }
3211 
3212     if (target_rc < 0) {
3213         /* Pre-1.0 Pacemaker versions, and Pacemaker 1.1.6 or earlier with
3214          * Heartbeat 2.0.7 or earlier as the cluster layer, did not include the
3215          * target_rc in the transition key, which (along with the similar case
3216          * of a corrupted transition key in the CIB) will be reported to this
3217          * function as -1. Pacemaker 2.0+ does not support rolling upgrades from
3218          * those versions or processing of saved CIB files from those versions,
3219          * so we do not need to care much about this case.
3220          */
3221         *status = PCMK_EXEC_ERROR;
3222         crm_warn("Expected result not found for %s on %s (corrupt or obsolete CIB?)",
3223                  key, node->details->uname);
3224 
3225     } else if (target_rc != *rc) {
3226         *status = PCMK_EXEC_ERROR;
3227         pe_rsc_debug(rsc, "%s on %s: expected %d (%s), got %d (%s%s%s)",
3228                      key, node->details->uname,
3229                      target_rc, services_ocf_exitcode_str(target_rc),
3230                      *rc, services_ocf_exitcode_str(*rc),
3231                      (*exit_reason? ": " : ""), exit_reason);
3232     }
3233 
3234     switch (*rc) {
3235         case PCMK_OCF_OK:
3236             if (is_probe && (target_rc == PCMK_OCF_NOT_RUNNING)) {
3237                 *status = PCMK_EXEC_DONE;
3238                 pe_rsc_info(rsc, "Probe found %s active on %s at %s",
3239                             rsc->id, node->details->uname,
3240                             last_change_str(xml_op));
3241             }
3242             break;
3243 
3244         case PCMK_OCF_NOT_RUNNING:
3245             if (is_probe || (target_rc == *rc)
3246                 || !pcmk_is_set(rsc->flags, pe_rsc_managed)) {
3247 
3248                 *status = PCMK_EXEC_DONE;
3249                 rsc->role = RSC_ROLE_STOPPED;
3250 
3251                 /* clear any previous failure actions */
3252                 *on_fail = action_fail_ignore;
3253                 pe__set_next_role(rsc, RSC_ROLE_UNKNOWN, "not running");
3254             }
3255             break;
3256 
3257         case PCMK_OCF_RUNNING_PROMOTED:
3258             if (is_probe && (*rc != target_rc)) {
3259                 *status = PCMK_EXEC_DONE;
3260                 pe_rsc_info(rsc,
3261                             "Probe found %s active and promoted on %s at %s",
3262                             rsc->id, node->details->uname,
3263                             last_change_str(xml_op));
3264             }
3265             rsc->role = RSC_ROLE_PROMOTED;
3266             break;
3267 
3268         case PCMK_OCF_DEGRADED_PROMOTED:
3269         case PCMK_OCF_FAILED_PROMOTED:
3270             rsc->role = RSC_ROLE_PROMOTED;
3271             *status = PCMK_EXEC_ERROR;
3272             break;
3273 
3274         case PCMK_OCF_NOT_CONFIGURED:
3275             *status = PCMK_EXEC_ERROR_FATAL;
3276             break;
3277 
3278         case PCMK_OCF_UNIMPLEMENT_FEATURE: {
3279             guint interval_ms = 0;
3280             crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3281 
3282             if (interval_ms > 0) {
3283                 *status = PCMK_EXEC_NOT_SUPPORTED;
3284                 break;
3285             }
3286             // fall through
3287         }
3288 
3289         case PCMK_OCF_NOT_INSTALLED:
3290         case PCMK_OCF_INVALID_PARAM:
3291         case PCMK_OCF_INSUFFICIENT_PRIV:
3292             if (!pe_can_fence(data_set, node)
3293                 && !strcmp(task, CRMD_ACTION_STOP)) {
3294                 /* If a stop fails and we can't fence, there's nothing else we can do */
3295                 pe_proc_err("No further recovery can be attempted for %s "
3296                             "because %s on %s failed (%s%s%s) at %s "
3297                             CRM_XS " rc=%d id=%s", rsc->id, task,
3298                             node->details->uname, services_ocf_exitcode_str(*rc),
3299                             (*exit_reason? ": " : ""), exit_reason,
3300                             last_change_str(xml_op), *rc, ID(xml_op));
3301                 pe__clear_resource_flags(rsc, pe_rsc_managed);
3302                 pe__set_resource_flags(rsc, pe_rsc_block);
3303             }
3304             *status = PCMK_EXEC_ERROR_HARD;
3305             break;
3306 
3307         default:
3308             if (*status == PCMK_EXEC_DONE) {
3309                 crm_info("Treating unknown exit status %d from %s of %s "
3310                          "on %s at %s as failure",
3311                          *rc, task, rsc->id, node->details->uname,
3312                          last_change_str(xml_op));
3313                 *status = PCMK_EXEC_ERROR;
3314             }
3315             break;
3316     }
3317 
3318     pe_rsc_trace(rsc, "Remapped %s status to '%s'",
3319                  key, pcmk_exec_status_str(*status));
3320 }
3321 
3322 // return TRUE if start or monitor last failure but parameters changed
3323 static bool
3324 should_clear_for_param_change(xmlNode *xml_op, const char *task,
     /* [previous][next][first][last][top][bottom][index][help] */
3325                               pe_resource_t *rsc, pe_node_t *node,
3326                               pe_working_set_t *data_set)
3327 {
3328     if (!strcmp(task, "start") || !strcmp(task, "monitor")) {
3329 
3330         if (pe__bundle_needs_remote_name(rsc, data_set)) {
3331             /* We haven't allocated resources yet, so we can't reliably
3332              * substitute addr parameters for the REMOTE_CONTAINER_HACK.
3333              * When that's needed, defer the check until later.
3334              */
3335             pe__add_param_check(xml_op, rsc, node, pe_check_last_failure,
3336                                 data_set);
3337 
3338         } else {
3339             op_digest_cache_t *digest_data = NULL;
3340 
3341             digest_data = rsc_action_digest_cmp(rsc, xml_op, node, data_set);
3342             switch (digest_data->rc) {
3343                 case RSC_DIGEST_UNKNOWN:
3344                     crm_trace("Resource %s history entry %s on %s"
3345                               " has no digest to compare",
3346                               rsc->id, get_op_key(xml_op), node->details->id);
3347                     break;
3348                 case RSC_DIGEST_MATCH:
3349                     break;
3350                 default:
3351                     return TRUE;
3352             }
3353         }
3354     }
3355     return FALSE;
3356 }
3357 
3358 // Order action after fencing of remote node, given connection rsc
3359 static void
3360 order_after_remote_fencing(pe_action_t *action, pe_resource_t *remote_conn,
     /* [previous][next][first][last][top][bottom][index][help] */
3361                            pe_working_set_t *data_set)
3362 {
3363     pe_node_t *remote_node = pe_find_node(data_set->nodes, remote_conn->id);
3364 
3365     if (remote_node) {
3366         pe_action_t *fence = pe_fence_op(remote_node, NULL, TRUE, NULL,
3367                                          FALSE, data_set);
3368 
3369         order_actions(fence, action, pe_order_implies_then);
3370     }
3371 }
3372 
3373 static bool
3374 should_ignore_failure_timeout(pe_resource_t *rsc, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
3375                               const char *task, guint interval_ms,
3376                               bool is_last_failure, pe_working_set_t *data_set)
3377 {
3378     /* Clearing failures of recurring monitors has special concerns. The
3379      * executor reports only changes in the monitor result, so if the
3380      * monitor is still active and still getting the same failure result,
3381      * that will go undetected after the failure is cleared.
3382      *
3383      * Also, the operation history will have the time when the recurring
3384      * monitor result changed to the given code, not the time when the
3385      * result last happened.
3386      *
3387      * @TODO We probably should clear such failures only when the failure
3388      * timeout has passed since the last occurrence of the failed result.
3389      * However we don't record that information. We could maybe approximate
3390      * that by clearing only if there is a more recent successful monitor or
3391      * stop result, but we don't even have that information at this point
3392      * since we are still unpacking the resource's operation history.
3393      *
3394      * This is especially important for remote connection resources with a
3395      * reconnect interval, so in that case, we skip clearing failures
3396      * if the remote node hasn't been fenced.
3397      */
3398     if (rsc->remote_reconnect_ms
3399         && pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
3400         && (interval_ms != 0) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3401 
3402         pe_node_t *remote_node = pe_find_node(data_set->nodes, rsc->id);
3403 
3404         if (remote_node && !remote_node->details->remote_was_fenced) {
3405             if (is_last_failure) {
3406                 crm_info("Waiting to clear monitor failure for remote node %s"
3407                          " until fencing has occurred", rsc->id);
3408             }
3409             return TRUE;
3410         }
3411     }
3412     return FALSE;
3413 }
3414 
3415 /*!
3416  * \internal
3417  * \brief Check operation age and schedule failure clearing when appropriate
3418  *
3419  * This function has two distinct purposes. The first is to check whether an
3420  * operation history entry is expired (i.e. the resource has a failure timeout,
3421  * the entry is older than the timeout, and the resource either has no fail
3422  * count or its fail count is entirely older than the timeout). The second is to
3423  * schedule fail count clearing when appropriate (i.e. the operation is expired
3424  * and either the resource has an expired fail count or the operation is a
3425  * last_failure for a remote connection resource with a reconnect interval,
3426  * or the operation is a last_failure for a start or monitor operation and the
3427  * resource's parameters have changed since the operation).
3428  *
3429  * \param[in] rsc       Resource that operation happened to
3430  * \param[in] node      Node that operation happened on
3431  * \param[in] rc        Actual result of operation
3432  * \param[in] xml_op    Operation history entry XML
3433  * \param[in] data_set  Current working set
3434  *
3435  * \return TRUE if operation history entry is expired, FALSE otherwise
3436  */
3437 static bool
3438 check_operation_expiry(pe_resource_t *rsc, pe_node_t *node, int rc,
     /* [previous][next][first][last][top][bottom][index][help] */
3439                        xmlNode *xml_op, pe_working_set_t *data_set)
3440 {
3441     bool expired = FALSE;
3442     bool is_last_failure = pcmk__ends_with(ID(xml_op), "_last_failure_0");
3443     time_t last_run = 0;
3444     guint interval_ms = 0;
3445     int unexpired_fail_count = 0;
3446     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3447     const char *clear_reason = NULL;
3448 
3449     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3450 
3451     if ((rsc->failure_timeout > 0)
3452         && (crm_element_value_epoch(xml_op, XML_RSC_OP_LAST_CHANGE,
3453                                     &last_run) == 0)) {
3454 
3455         // Resource has a failure-timeout, and history entry has a timestamp
3456 
3457         time_t now = get_effective_time(data_set);
3458         time_t last_failure = 0;
3459 
3460         // Is this particular operation history older than the failure timeout?
3461         if ((now >= (last_run + rsc->failure_timeout))
3462             && !should_ignore_failure_timeout(rsc, xml_op, task, interval_ms,
3463                                               is_last_failure, data_set)) {
3464             expired = TRUE;
3465         }
3466 
3467         // Does the resource as a whole have an unexpired fail count?
3468         unexpired_fail_count = pe_get_failcount(node, rsc, &last_failure,
3469                                                 pe_fc_effective, xml_op,
3470                                                 data_set);
3471 
3472         // Update scheduler recheck time according to *last* failure
3473         crm_trace("%s@%lld is %sexpired @%lld with unexpired_failures=%d timeout=%ds"
3474                   " last-failure@%lld",
3475                   ID(xml_op), (long long) last_run, (expired? "" : "not "),
3476                   (long long) now, unexpired_fail_count, rsc->failure_timeout,
3477                   (long long) last_failure);
3478         last_failure += rsc->failure_timeout + 1;
3479         if (unexpired_fail_count && (now < last_failure)) {
3480             pe__update_recheck_time(last_failure, data_set);
3481         }
3482     }
3483 
3484     if (expired) {
3485         if (pe_get_failcount(node, rsc, NULL, pe_fc_default, xml_op, data_set)) {
3486 
3487             // There is a fail count ignoring timeout
3488 
3489             if (unexpired_fail_count == 0) {
3490                 // There is no fail count considering timeout
3491                 clear_reason = "it expired";
3492 
3493             } else {
3494                 /* This operation is old, but there is an unexpired fail count.
3495                  * In a properly functioning cluster, this should only be
3496                  * possible if this operation is not a failure (otherwise the
3497                  * fail count should be expired too), so this is really just a
3498                  * failsafe.
3499                  */
3500                 expired = FALSE;
3501             }
3502 
3503         } else if (is_last_failure && rsc->remote_reconnect_ms) {
3504             /* Clear any expired last failure when reconnect interval is set,
3505              * even if there is no fail count.
3506              */
3507             clear_reason = "reconnect interval is set";
3508         }
3509     }
3510 
3511     if (!expired && is_last_failure
3512         && should_clear_for_param_change(xml_op, task, rsc, node, data_set)) {
3513         clear_reason = "resource parameters have changed";
3514     }
3515 
3516     if (clear_reason != NULL) {
3517         // Schedule clearing of the fail count
3518         pe_action_t *clear_op = pe__clear_failcount(rsc, node, clear_reason,
3519                                                     data_set);
3520 
3521         if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
3522             && rsc->remote_reconnect_ms) {
3523             /* If we're clearing a remote connection due to a reconnect
3524              * interval, we want to wait until any scheduled fencing
3525              * completes.
3526              *
3527              * We could limit this to remote_node->details->unclean, but at
3528              * this point, that's always true (it won't be reliable until
3529              * after unpack_node_history() is done).
3530              */
3531             crm_info("Clearing %s failure will wait until any scheduled "
3532                      "fencing of %s completes", task, rsc->id);
3533             order_after_remote_fencing(clear_op, rsc, data_set);
3534         }
3535     }
3536 
3537     if (expired && (interval_ms == 0) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3538         switch(rc) {
3539             case PCMK_OCF_OK:
3540             case PCMK_OCF_NOT_RUNNING:
3541             case PCMK_OCF_RUNNING_PROMOTED:
3542             case PCMK_OCF_DEGRADED:
3543             case PCMK_OCF_DEGRADED_PROMOTED:
3544                 // Don't expire probes that return these values
3545                 expired = FALSE;
3546                 break;
3547         }
3548     }
3549 
3550     return expired;
3551 }
3552 
3553 int pe__target_rc_from_xml(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
3554 {
3555     int target_rc = 0;
3556     const char *key = crm_element_value(xml_op, XML_ATTR_TRANSITION_KEY);
3557 
3558     if (key == NULL) {
3559         return -1;
3560     }
3561     decode_transition_key(key, NULL, NULL, NULL, &target_rc);
3562     return target_rc;
3563 }
3564 
3565 static enum action_fail_response
3566 get_action_on_fail(pe_resource_t *rsc, const char *key, const char *task, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
3567 {
3568     enum action_fail_response result = action_fail_recover;
3569     pe_action_t *action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
3570 
3571     result = action->on_fail;
3572     pe_free_action(action);
3573 
3574     return result;
3575 }
3576 
3577 static void
3578 update_resource_state(pe_resource_t * rsc, pe_node_t * node, xmlNode * xml_op, const char * task, int rc,
     /* [previous][next][first][last][top][bottom][index][help] */
3579                       xmlNode * last_failure, enum action_fail_response * on_fail, pe_working_set_t * data_set)
3580 {
3581     gboolean clear_past_failure = FALSE;
3582 
3583     CRM_ASSERT(rsc);
3584     CRM_ASSERT(xml_op);
3585 
3586     if (rc == PCMK_OCF_NOT_INSTALLED || (!pe_rsc_is_bundled(rsc) && pcmk_xe_mask_probe_failure(xml_op))) {
3587         rsc->role = RSC_ROLE_STOPPED;
3588 
3589     } else if (rc == PCMK_OCF_NOT_RUNNING) {
3590         clear_past_failure = TRUE;
3591 
3592     } else if (pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3593         if (last_failure) {
3594             const char *op_key = get_op_key(xml_op);
3595             const char *last_failure_key = get_op_key(last_failure);
3596 
3597             if (pcmk__str_eq(op_key, last_failure_key, pcmk__str_casei)) {
3598                 clear_past_failure = TRUE;
3599             }
3600         }
3601 
3602         if (rsc->role < RSC_ROLE_STARTED) {
3603             set_active(rsc);
3604         }
3605 
3606     } else if (pcmk__str_eq(task, CRMD_ACTION_START, pcmk__str_casei)) {
3607         rsc->role = RSC_ROLE_STARTED;
3608         clear_past_failure = TRUE;
3609 
3610     } else if (pcmk__str_eq(task, CRMD_ACTION_STOP, pcmk__str_casei)) {
3611         rsc->role = RSC_ROLE_STOPPED;
3612         clear_past_failure = TRUE;
3613 
3614     } else if (pcmk__str_eq(task, CRMD_ACTION_PROMOTE, pcmk__str_casei)) {
3615         rsc->role = RSC_ROLE_PROMOTED;
3616         clear_past_failure = TRUE;
3617 
3618     } else if (pcmk__str_eq(task, CRMD_ACTION_DEMOTE, pcmk__str_casei)) {
3619 
3620         if (*on_fail == action_fail_demote) {
3621             // Demote clears an error only if on-fail=demote
3622             clear_past_failure = TRUE;
3623         }
3624         rsc->role = RSC_ROLE_UNPROMOTED;
3625 
3626     } else if (pcmk__str_eq(task, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
3627         rsc->role = RSC_ROLE_STARTED;
3628         clear_past_failure = TRUE;
3629 
3630     } else if (pcmk__str_eq(task, CRMD_ACTION_MIGRATE, pcmk__str_casei)) {
3631         unpack_migrate_to_success(rsc, node, xml_op, data_set);
3632 
3633     } else if (rsc->role < RSC_ROLE_STARTED) {
3634         pe_rsc_trace(rsc, "%s active on %s", rsc->id, node->details->uname);
3635         set_active(rsc);
3636     }
3637 
3638     /* clear any previous failure actions */
3639     if (clear_past_failure) {
3640         switch (*on_fail) {
3641             case action_fail_stop:
3642             case action_fail_fence:
3643             case action_fail_migrate:
3644             case action_fail_standby:
3645                 pe_rsc_trace(rsc, "%s.%s is not cleared by a completed stop",
3646                              rsc->id, fail2text(*on_fail));
3647                 break;
3648 
3649             case action_fail_block:
3650             case action_fail_ignore:
3651             case action_fail_demote:
3652             case action_fail_recover:
3653             case action_fail_restart_container:
3654                 *on_fail = action_fail_ignore;
3655                 pe__set_next_role(rsc, RSC_ROLE_UNKNOWN, "clear past failures");
3656                 break;
3657             case action_fail_reset_remote:
3658                 if (rsc->remote_reconnect_ms == 0) {
3659                     /* With no reconnect interval, the connection is allowed to
3660                      * start again after the remote node is fenced and
3661                      * completely stopped. (With a reconnect interval, we wait
3662                      * for the failure to be cleared entirely before attempting
3663                      * to reconnect.)
3664                      */
3665                     *on_fail = action_fail_ignore;
3666                     pe__set_next_role(rsc, RSC_ROLE_UNKNOWN,
3667                                       "clear past failures and reset remote");
3668                 }
3669                 break;
3670         }
3671     }
3672 }
3673 
3674 static void
3675 unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
3676               xmlNode **last_failure, enum action_fail_response *on_fail,
3677               pe_working_set_t *data_set)
3678 {
3679     int rc = 0;
3680     int old_rc = 0;
3681     int task_id = 0;
3682     int target_rc = 0;
3683     int old_target_rc = 0;
3684     int status = PCMK_EXEC_UNKNOWN;
3685     guint interval_ms = 0;
3686     const char *task = NULL;
3687     const char *task_key = NULL;
3688     const char *exit_reason = NULL;
3689     bool expired = false;
3690     pe_resource_t *parent = rsc;
3691     enum action_fail_response failure_strategy = action_fail_recover;
3692     bool maskable_probe_failure = false;
3693 
3694     CRM_CHECK(rsc && node && xml_op, return);
3695 
3696     target_rc = pe__target_rc_from_xml(xml_op);
3697     task_key = get_op_key(xml_op);
3698     task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3699     exit_reason = crm_element_value(xml_op, XML_LRM_ATTR_EXIT_REASON);
3700     if (exit_reason == NULL) {
3701         exit_reason = "";
3702     }
3703 
3704     crm_element_value_int(xml_op, XML_LRM_ATTR_RC, &rc);
3705     crm_element_value_int(xml_op, XML_LRM_ATTR_CALLID, &task_id);
3706     crm_element_value_int(xml_op, XML_LRM_ATTR_OPSTATUS, &status);
3707     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3708 
3709     CRM_CHECK(task != NULL, return);
3710     CRM_CHECK((status >= PCMK_EXEC_PENDING) && (status <= PCMK_EXEC_MAX),
3711               return);
3712 
3713     if (!strcmp(task, CRMD_ACTION_NOTIFY) ||
3714         !strcmp(task, CRMD_ACTION_METADATA)) {
3715         /* safe to ignore these */
3716         return;
3717     }
3718 
3719     if (!pcmk_is_set(rsc->flags, pe_rsc_unique)) {
3720         parent = uber_parent(rsc);
3721     }
3722 
3723     pe_rsc_trace(rsc, "Unpacking task %s/%s (call_id=%d, status=%d, rc=%d) on %s (role=%s)",
3724                  task_key, task, task_id, status, rc, node->details->uname, role2text(rsc->role));
3725 
3726     if (node->details->unclean) {
3727         pe_rsc_trace(rsc, "Node %s (where %s is running) is unclean."
3728                      " Further action depends on the value of the stop's on-fail attribute",
3729                      node->details->uname, rsc->id);
3730     }
3731 
3732     /* It should be possible to call remap_operation() first then call
3733      * check_operation_expiry() only if rc != target_rc, because there should
3734      * never be a fail count without at least one unexpected result in the
3735      * resource history. That would be more efficient by avoiding having to call
3736      * check_operation_expiry() for expected results.
3737      *
3738      * However, we do have such configurations in the scheduler regression
3739      * tests, even if it shouldn't be possible with the current code. It's
3740      * probably a good idea anyway, but that would require updating the test
3741      * inputs to something currently possible.
3742      */
3743 
3744     if ((status != PCMK_EXEC_NOT_INSTALLED)
3745         && check_operation_expiry(rsc, node, rc, xml_op, data_set)) {
3746         expired = true;
3747     }
3748 
3749     old_rc = rc;
3750     old_target_rc = target_rc;
3751 
3752     remap_operation(xml_op, rsc, node, data_set, on_fail, target_rc,
3753                     &rc, &status);
3754 
3755     maskable_probe_failure = !pe_rsc_is_bundled(rsc) && pcmk_xe_mask_probe_failure(xml_op);
3756 
3757     if (expired && maskable_probe_failure && old_rc != old_target_rc) {
3758         if (rsc->role <= RSC_ROLE_STOPPED) {
3759             rsc->role = RSC_ROLE_UNKNOWN;
3760         }
3761 
3762         goto done;
3763 
3764     } else if (expired && (rc != target_rc)) {
3765         const char *magic = crm_element_value(xml_op, XML_ATTR_TRANSITION_MAGIC);
3766 
3767         if (interval_ms == 0) {
3768             crm_notice("Ignoring expired %s failure on %s "
3769                        CRM_XS " actual=%d expected=%d magic=%s",
3770                        task_key, node->details->uname, rc, target_rc, magic);
3771             goto done;
3772 
3773         } else if(node->details->online && node->details->unclean == FALSE) {
3774             /* Reschedule the recurring monitor. schedule_cancel() won't work at
3775              * this stage, so as a hacky workaround, forcibly change the restart
3776              * digest so pcmk__check_action_config() does what we want later.
3777              *
3778              * @TODO We should skip this if there is a newer successful monitor.
3779              *       Also, this causes rescheduling only if the history entry
3780              *       has an op-digest (which the expire-non-blocked-failure
3781              *       scheduler regression test doesn't, but that may not be a
3782              *       realistic scenario in production).
3783              */
3784             crm_notice("Rescheduling %s after failure expired on %s "
3785                        CRM_XS " actual=%d expected=%d magic=%s",
3786                        task_key, node->details->uname, rc, target_rc, magic);
3787             crm_xml_add(xml_op, XML_LRM_ATTR_RESTART_DIGEST, "calculated-failure-timeout");
3788             goto done;
3789         }
3790     }
3791 
3792     if (maskable_probe_failure) {
3793         crm_notice("Treating probe result '%s' for %s on %s as 'not running'",
3794                    services_ocf_exitcode_str(old_rc), rsc->id, node->details->uname);
3795         update_resource_state(rsc, node, xml_op, task, target_rc, *last_failure,
3796                               on_fail, data_set);
3797         crm_xml_add(xml_op, XML_ATTR_UNAME, node->details->uname);
3798 
3799         record_failed_op(xml_op, node, rsc, data_set);
3800         resource_location(parent, node, -INFINITY, "masked-probe-failure", data_set);
3801         goto done;
3802     }
3803 
3804     switch (status) {
3805         case PCMK_EXEC_CANCELLED:
3806             // Should never happen
3807             pe_err("Resource history contains cancellation '%s' "
3808                    "(%s of %s on %s at %s)",
3809                    ID(xml_op), task, rsc->id, node->details->uname,
3810                    last_change_str(xml_op));
3811             goto done;
3812 
3813         case PCMK_EXEC_PENDING:
3814             if (!strcmp(task, CRMD_ACTION_START)) {
3815                 pe__set_resource_flags(rsc, pe_rsc_start_pending);
3816                 set_active(rsc);
3817 
3818             } else if (!strcmp(task, CRMD_ACTION_PROMOTE)) {
3819                 rsc->role = RSC_ROLE_PROMOTED;
3820 
3821             } else if (!strcmp(task, CRMD_ACTION_MIGRATE) && node->details->unclean) {
3822                 /* If a pending migrate_to action is out on a unclean node,
3823                  * we have to force the stop action on the target. */
3824                 const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
3825                 pe_node_t *target = pe_find_node(data_set->nodes, migrate_target);
3826                 if (target) {
3827                     stop_action(rsc, target, FALSE);
3828                 }
3829             }
3830 
3831             if (rsc->pending_task == NULL) {
3832                 if ((interval_ms != 0) || strcmp(task, CRMD_ACTION_STATUS)) {
3833                     rsc->pending_task = strdup(task);
3834                     rsc->pending_node = node;
3835                 } else {
3836                     /* Pending probes are not printed, even if pending
3837                      * operations are requested. If someone ever requests that
3838                      * behavior, enable the below and the corresponding part of
3839                      * native.c:native_pending_task().
3840                      */
3841 #if 0
3842                     rsc->pending_task = strdup("probe");
3843                     rsc->pending_node = node;
3844 #endif
3845                 }
3846             }
3847             goto done;
3848 
3849         case PCMK_EXEC_DONE:
3850             pe_rsc_trace(rsc, "%s of %s on %s completed at %s " CRM_XS " id=%s",
3851                          task, rsc->id, node->details->uname,
3852                          last_change_str(xml_op), ID(xml_op));
3853             update_resource_state(rsc, node, xml_op, task, rc, *last_failure, on_fail, data_set);
3854             goto done;
3855 
3856         case PCMK_EXEC_NOT_INSTALLED:
3857             failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
3858             if (failure_strategy == action_fail_ignore) {
3859                 crm_warn("Cannot ignore failed %s of %s on %s: "
3860                          "Resource agent doesn't exist "
3861                          CRM_XS " status=%d rc=%d id=%s",
3862                          task, rsc->id, node->details->uname, status, rc,
3863                          ID(xml_op));
3864                 /* Also for printing it as "FAILED" by marking it as pe_rsc_failed later */
3865                 *on_fail = action_fail_migrate;
3866             }
3867             resource_location(parent, node, -INFINITY, "hard-error", data_set);
3868             unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail, data_set);
3869             goto done;
3870 
3871         case PCMK_EXEC_NOT_CONNECTED:
3872             if (pe__is_guest_or_remote_node(node)
3873                 && pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_managed)) {
3874                 /* We should never get into a situation where a managed remote
3875                  * connection resource is considered OK but a resource action
3876                  * behind the connection gets a "not connected" status. But as a
3877                  * fail-safe in case a bug or unusual circumstances do lead to
3878                  * that, ensure the remote connection is considered failed.
3879                  */
3880                 pe__set_resource_flags(node->details->remote_rsc,
3881                                        pe_rsc_failed|pe_rsc_stop);
3882             }
3883             break; // Not done, do error handling
3884 
3885         case PCMK_EXEC_ERROR:
3886         case PCMK_EXEC_ERROR_HARD:
3887         case PCMK_EXEC_ERROR_FATAL:
3888         case PCMK_EXEC_TIMEOUT:
3889         case PCMK_EXEC_NOT_SUPPORTED:
3890         case PCMK_EXEC_INVALID:
3891             break; // Not done, do error handling
3892 
3893         case PCMK_EXEC_NO_FENCE_DEVICE:
3894         case PCMK_EXEC_NO_SECRETS:
3895             status = PCMK_EXEC_ERROR_HARD;
3896             break; // Not done, do error handling
3897     }
3898 
3899     failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
3900     if ((failure_strategy == action_fail_ignore)
3901         || (failure_strategy == action_fail_restart_container
3902             && !strcmp(task, CRMD_ACTION_STOP))) {
3903 
3904         crm_warn("Pretending failed %s (%s%s%s) of %s on %s at %s "
3905                  "succeeded " CRM_XS " rc=%d id=%s",
3906                  task, services_ocf_exitcode_str(rc),
3907                  (*exit_reason? ": " : ""), exit_reason, rsc->id,
3908                  node->details->uname, last_change_str(xml_op), rc,
3909                  ID(xml_op));
3910 
3911         update_resource_state(rsc, node, xml_op, task, target_rc, *last_failure,
3912                               on_fail, data_set);
3913         crm_xml_add(xml_op, XML_ATTR_UNAME, node->details->uname);
3914         pe__set_resource_flags(rsc, pe_rsc_failure_ignored);
3915 
3916         record_failed_op(xml_op, node, rsc, data_set);
3917 
3918         if ((failure_strategy == action_fail_restart_container)
3919             && cmp_on_fail(*on_fail, action_fail_recover) <= 0) {
3920             *on_fail = failure_strategy;
3921         }
3922 
3923     } else {
3924         unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail,
3925                               data_set);
3926 
3927         if (status == PCMK_EXEC_ERROR_HARD) {
3928             do_crm_log(rc != PCMK_OCF_NOT_INSTALLED?LOG_ERR:LOG_NOTICE,
3929                        "Preventing %s from restarting on %s because "
3930                        "of hard failure (%s%s%s)" CRM_XS " rc=%d id=%s",
3931                        parent->id, node->details->uname,
3932                        services_ocf_exitcode_str(rc),
3933                        (*exit_reason? ": " : ""), exit_reason,
3934                        rc, ID(xml_op));
3935             resource_location(parent, node, -INFINITY, "hard-error", data_set);
3936 
3937         } else if (status == PCMK_EXEC_ERROR_FATAL) {
3938             crm_err("Preventing %s from restarting anywhere because "
3939                     "of fatal failure (%s%s%s) " CRM_XS " rc=%d id=%s",
3940                     parent->id, services_ocf_exitcode_str(rc),
3941                     (*exit_reason? ": " : ""), exit_reason,
3942                     rc, ID(xml_op));
3943             resource_location(parent, NULL, -INFINITY, "fatal-error", data_set);
3944         }
3945     }
3946 
3947 done:
3948     pe_rsc_trace(rsc, "Resource %s after %s: role=%s, next=%s",
3949                  rsc->id, task, role2text(rsc->role),
3950                  role2text(rsc->next_role));
3951 }
3952 
3953 static void
3954 add_node_attrs(xmlNode *xml_obj, pe_node_t *node, bool overwrite,
     /* [previous][next][first][last][top][bottom][index][help] */
3955                pe_working_set_t *data_set)
3956 {
3957     const char *cluster_name = NULL;
3958 
3959     pe_rule_eval_data_t rule_data = {
3960         .node_hash = NULL,
3961         .role = RSC_ROLE_UNKNOWN,
3962         .now = data_set->now,
3963         .match_data = NULL,
3964         .rsc_data = NULL,
3965         .op_data = NULL
3966     };
3967 
3968     g_hash_table_insert(node->details->attrs,
3969                         strdup(CRM_ATTR_UNAME), strdup(node->details->uname));
3970 
3971     g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_ID),
3972                         strdup(node->details->id));
3973     if (pcmk__str_eq(node->details->id, data_set->dc_uuid, pcmk__str_casei)) {
3974         data_set->dc_node = node;
3975         node->details->is_dc = TRUE;
3976         g_hash_table_insert(node->details->attrs,
3977                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_TRUE));
3978     } else {
3979         g_hash_table_insert(node->details->attrs,
3980                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_FALSE));
3981     }
3982 
3983     cluster_name = g_hash_table_lookup(data_set->config_hash, "cluster-name");
3984     if (cluster_name) {
3985         g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_CLUSTER_NAME),
3986                             strdup(cluster_name));
3987     }
3988 
3989     pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_ATTR_SETS, &rule_data,
3990                                node->details->attrs, NULL, overwrite, data_set);
3991 
3992     if (pe_node_attribute_raw(node, CRM_ATTR_SITE_NAME) == NULL) {
3993         const char *site_name = pe_node_attribute_raw(node, "site-name");
3994 
3995         if (site_name) {
3996             g_hash_table_insert(node->details->attrs,
3997                                 strdup(CRM_ATTR_SITE_NAME),
3998                                 strdup(site_name));
3999 
4000         } else if (cluster_name) {
4001             /* Default to cluster-name if unset */
4002             g_hash_table_insert(node->details->attrs,
4003                                 strdup(CRM_ATTR_SITE_NAME),
4004                                 strdup(cluster_name));
4005         }
4006     }
4007 }
4008 
4009 static GList *
4010 extract_operations(const char *node, const char *rsc, xmlNode * rsc_entry, gboolean active_filter)
     /* [previous][next][first][last][top][bottom][index][help] */
4011 {
4012     int counter = -1;
4013     int stop_index = -1;
4014     int start_index = -1;
4015 
4016     xmlNode *rsc_op = NULL;
4017 
4018     GList *gIter = NULL;
4019     GList *op_list = NULL;
4020     GList *sorted_op_list = NULL;
4021 
4022     /* extract operations */
4023     op_list = NULL;
4024     sorted_op_list = NULL;
4025 
4026     for (rsc_op = pcmk__xe_first_child(rsc_entry);
4027          rsc_op != NULL; rsc_op = pcmk__xe_next(rsc_op)) {
4028 
4029         if (pcmk__str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP,
4030                          pcmk__str_none)) {
4031             crm_xml_add(rsc_op, "resource", rsc);
4032             crm_xml_add(rsc_op, XML_ATTR_UNAME, node);
4033             op_list = g_list_prepend(op_list, rsc_op);
4034         }
4035     }
4036 
4037     if (op_list == NULL) {
4038         /* if there are no operations, there is nothing to do */
4039         return NULL;
4040     }
4041 
4042     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
4043 
4044     /* create active recurring operations as optional */
4045     if (active_filter == FALSE) {
4046         return sorted_op_list;
4047     }
4048 
4049     op_list = NULL;
4050 
4051     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
4052 
4053     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
4054         xmlNode *rsc_op = (xmlNode *) gIter->data;
4055 
4056         counter++;
4057 
4058         if (start_index < stop_index) {
4059             crm_trace("Skipping %s: not active", ID(rsc_entry));
4060             break;
4061 
4062         } else if (counter < start_index) {
4063             crm_trace("Skipping %s: old", ID(rsc_op));
4064             continue;
4065         }
4066         op_list = g_list_append(op_list, rsc_op);
4067     }
4068 
4069     g_list_free(sorted_op_list);
4070     return op_list;
4071 }
4072 
4073 GList *
4074 find_operations(const char *rsc, const char *node, gboolean active_filter,
     /* [previous][next][first][last][top][bottom][index][help] */
4075                 pe_working_set_t * data_set)
4076 {
4077     GList *output = NULL;
4078     GList *intermediate = NULL;
4079 
4080     xmlNode *tmp = NULL;
4081     xmlNode *status = find_xml_node(data_set->input, XML_CIB_TAG_STATUS, TRUE);
4082 
4083     pe_node_t *this_node = NULL;
4084 
4085     xmlNode *node_state = NULL;
4086 
4087     for (node_state = pcmk__xe_first_child(status); node_state != NULL;
4088          node_state = pcmk__xe_next(node_state)) {
4089 
4090         if (pcmk__str_eq((const char *)node_state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
4091             const char *uname = crm_element_value(node_state, XML_ATTR_UNAME);
4092 
4093             if (node != NULL && !pcmk__str_eq(uname, node, pcmk__str_casei)) {
4094                 continue;
4095             }
4096 
4097             this_node = pe_find_node(data_set->nodes, uname);
4098             if(this_node == NULL) {
4099                 CRM_LOG_ASSERT(this_node != NULL);
4100                 continue;
4101 
4102             } else if (pe__is_guest_or_remote_node(this_node)) {
4103                 determine_remote_online_status(data_set, this_node);
4104 
4105             } else {
4106                 determine_online_status(node_state, this_node, data_set);
4107             }
4108 
4109             if (this_node->details->online
4110                 || pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
4111                 /* offline nodes run no resources...
4112                  * unless stonith is enabled in which case we need to
4113                  *   make sure rsc start events happen after the stonith
4114                  */
4115                 xmlNode *lrm_rsc = NULL;
4116 
4117                 tmp = find_xml_node(node_state, XML_CIB_TAG_LRM, FALSE);
4118                 tmp = find_xml_node(tmp, XML_LRM_TAG_RESOURCES, FALSE);
4119 
4120                 for (lrm_rsc = pcmk__xe_first_child(tmp); lrm_rsc != NULL;
4121                      lrm_rsc = pcmk__xe_next(lrm_rsc)) {
4122 
4123                     if (pcmk__str_eq((const char *)lrm_rsc->name,
4124                                      XML_LRM_TAG_RESOURCE, pcmk__str_none)) {
4125 
4126                         const char *rsc_id = crm_element_value(lrm_rsc, XML_ATTR_ID);
4127 
4128                         if (rsc != NULL && !pcmk__str_eq(rsc_id, rsc, pcmk__str_casei)) {
4129                             continue;
4130                         }
4131 
4132                         intermediate = extract_operations(uname, rsc_id, lrm_rsc, active_filter);
4133                         output = g_list_concat(output, intermediate);
4134                     }
4135                 }
4136             }
4137         }
4138     }
4139 
4140     return output;
4141 }

/* [previous][next][first][last][top][bottom][index][help] */