root/lib/pengine/unpack.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. is_dangling_guest_node
  2. pe_fence_node
  3. set_if_xpath
  4. unpack_config
  5. pe_create_node
  6. expand_remote_rsc_meta
  7. handle_startup_fencing
  8. unpack_nodes
  9. setup_container
  10. unpack_remote_nodes
  11. link_rsc2remotenode
  12. destroy_tag
  13. unpack_resources
  14. unpack_tags
  15. unpack_ticket_state
  16. unpack_tickets_state
  17. unpack_handle_remote_attrs
  18. unpack_transient_attributes
  19. unpack_node_state
  20. unpack_node_history
  21. unpack_status
  22. determine_online_status_no_fencing
  23. determine_online_status_fencing
  24. determine_remote_online_status
  25. determine_online_status
  26. pe_base_name_end
  27. clone_strip
  28. clone_zero
  29. create_fake_resource
  30. create_anonymous_orphan
  31. find_anonymous_clone
  32. unpack_find_resource
  33. process_orphan_resource
  34. process_rsc_state
  35. process_recurring
  36. calculate_active_ops
  37. unpack_shutdown_lock
  38. unpack_lrm_resource
  39. handle_orphaned_container_fillers
  40. unpack_node_lrm
  41. set_active
  42. set_node_score
  43. find_lrm_op
  44. pe__call_id
  45. stop_happened_after
  46. unpack_migrate_to_success
  47. newer_op
  48. unpack_migrate_to_failure
  49. unpack_migrate_from_failure
  50. record_failed_op
  51. get_op_key
  52. last_change_str
  53. cmp_on_fail
  54. unpack_rsc_op_failure
  55. determine_op_status
  56. should_clear_for_param_change
  57. order_after_remote_fencing
  58. should_ignore_failure_timeout
  59. check_operation_expiry
  60. pe__target_rc_from_xml
  61. get_action_on_fail
  62. update_resource_state
  63. remap_monitor_rc
  64. unpack_rsc_op
  65. add_node_attrs
  66. extract_operations
  67. find_operations

   1 /*
   2  * Copyright 2004-2021 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <stdio.h>
  13 #include <string.h>
  14 #include <glib.h>
  15 #include <time.h>
  16 
  17 #include <crm/crm.h>
  18 #include <crm/services.h>
  19 #include <crm/msg_xml.h>
  20 #include <crm/common/xml.h>
  21 #include <crm/common/xml_internal.h>
  22 
  23 #include <crm/common/util.h>
  24 #include <crm/pengine/rules.h>
  25 #include <crm/pengine/internal.h>
  26 #include <pe_status_private.h>
  27 
  28 CRM_TRACE_INIT_DATA(pe_status);
  29 
  30 /* This uses pcmk__set_flags_as()/pcmk__clear_flags_as() directly rather than
  31  * use pe__set_working_set_flags()/pe__clear_working_set_flags() so that the
  32  * flag is stringified more readably in log messages.
  33  */
  34 #define set_config_flag(data_set, option, flag) do {                        \
  35         const char *scf_value = pe_pref((data_set)->config_hash, (option)); \
  36         if (scf_value != NULL) {                                            \
  37             if (crm_is_true(scf_value)) {                                   \
  38                 (data_set)->flags = pcmk__set_flags_as(__func__, __LINE__,  \
  39                                     LOG_TRACE, "Working set",               \
  40                                     crm_system_name, (data_set)->flags,     \
  41                                     (flag), #flag);                         \
  42             } else {                                                        \
  43                 (data_set)->flags = pcmk__clear_flags_as(__func__, __LINE__,\
  44                                     LOG_TRACE, "Working set",               \
  45                                     crm_system_name, (data_set)->flags,     \
  46                                     (flag), #flag);                         \
  47             }                                                               \
  48         }                                                                   \
  49     } while(0)
  50 
  51 static void unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
  52                           xmlNode **last_failure,
  53                           enum action_fail_response *failed,
  54                           pe_working_set_t *data_set);
  55 static void determine_remote_online_status(pe_working_set_t *data_set,
  56                                            pe_node_t *this_node);
  57 static void add_node_attrs(xmlNode *attrs, pe_node_t *node, bool overwrite,
  58                            pe_working_set_t *data_set);
  59 static void determine_online_status(xmlNode *node_state, pe_node_t *this_node,
  60                                     pe_working_set_t *data_set);
  61 
  62 static void unpack_node_lrm(pe_node_t *node, xmlNode *xml,
  63                             pe_working_set_t *data_set);
  64 
  65 
  66 // Bitmask for warnings we only want to print once
  67 uint32_t pe_wo = 0;
  68 
  69 static gboolean
  70 is_dangling_guest_node(pe_node_t *node)
     /* [previous][next][first][last][top][bottom][index][help] */
  71 {
  72     /* we are looking for a remote-node that was supposed to be mapped to a
  73      * container resource, but all traces of that container have disappeared 
  74      * from both the config and the status section. */
  75     if (pe__is_guest_or_remote_node(node) &&
  76         node->details->remote_rsc &&
  77         node->details->remote_rsc->container == NULL &&
  78         pcmk_is_set(node->details->remote_rsc->flags,
  79                     pe_rsc_orphan_container_filler)) {
  80         return TRUE;
  81     }
  82 
  83     return FALSE;
  84 }
  85 
  86 
  87 /*!
  88  * \brief Schedule a fence action for a node
  89  *
  90  * \param[in,out] data_set  Current working set of cluster
  91  * \param[in,out] node      Node to fence
  92  * \param[in]     reason    Text description of why fencing is needed
  93  * \param[in]     priority_delay  Whether to consider `priority-fencing-delay`
  94  */
  95 void
  96 pe_fence_node(pe_working_set_t * data_set, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
  97               const char *reason, bool priority_delay)
  98 {
  99     CRM_CHECK(node, return);
 100 
 101     /* A guest node is fenced by marking its container as failed */
 102     if (pe__is_guest_node(node)) {
 103         pe_resource_t *rsc = node->details->remote_rsc->container;
 104 
 105         if (!pcmk_is_set(rsc->flags, pe_rsc_failed)) {
 106             if (!pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 107                 crm_notice("Not fencing guest node %s "
 108                            "(otherwise would because %s): "
 109                            "its guest resource %s is unmanaged",
 110                            node->details->uname, reason, rsc->id);
 111             } else {
 112                 crm_warn("Guest node %s will be fenced "
 113                          "(by recovering its guest resource %s): %s",
 114                          node->details->uname, rsc->id, reason);
 115 
 116                 /* We don't mark the node as unclean because that would prevent the
 117                  * node from running resources. We want to allow it to run resources
 118                  * in this transition if the recovery succeeds.
 119                  */
 120                 node->details->remote_requires_reset = TRUE;
 121                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
 122             }
 123         }
 124 
 125     } else if (is_dangling_guest_node(node)) {
 126         crm_info("Cleaning up dangling connection for guest node %s: "
 127                  "fencing was already done because %s, "
 128                  "and guest resource no longer exists",
 129                  node->details->uname, reason);
 130         pe__set_resource_flags(node->details->remote_rsc,
 131                                pe_rsc_failed|pe_rsc_stop);
 132 
 133     } else if (pe__is_remote_node(node)) {
 134         pe_resource_t *rsc = node->details->remote_rsc;
 135 
 136         if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 137             crm_notice("Not fencing remote node %s "
 138                        "(otherwise would because %s): connection is unmanaged",
 139                        node->details->uname, reason);
 140         } else if(node->details->remote_requires_reset == FALSE) {
 141             node->details->remote_requires_reset = TRUE;
 142             crm_warn("Remote node %s %s: %s",
 143                      node->details->uname,
 144                      pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 145                      reason);
 146         }
 147         node->details->unclean = TRUE;
 148         // No need to apply `priority-fencing-delay` for remote nodes
 149         pe_fence_op(node, NULL, TRUE, reason, FALSE, data_set);
 150 
 151     } else if (node->details->unclean) {
 152         crm_trace("Cluster node %s %s because %s",
 153                   node->details->uname,
 154                   pe_can_fence(data_set, node)? "would also be fenced" : "also is unclean",
 155                   reason);
 156 
 157     } else {
 158         crm_warn("Cluster node %s %s: %s",
 159                  node->details->uname,
 160                  pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 161                  reason);
 162         node->details->unclean = TRUE;
 163         pe_fence_op(node, NULL, TRUE, reason, priority_delay, data_set);
 164     }
 165 }
 166 
 167 // @TODO xpaths can't handle templates, rules, or id-refs
 168 
 169 // nvpair with provides or requires set to unfencing
 170 #define XPATH_UNFENCING_NVPAIR XML_CIB_TAG_NVPAIR                \
 171     "[(@" XML_NVPAIR_ATTR_NAME "='" PCMK_STONITH_PROVIDES "'"    \
 172     "or @" XML_NVPAIR_ATTR_NAME "='" XML_RSC_ATTR_REQUIRES "') " \
 173     "and @" XML_NVPAIR_ATTR_VALUE "='unfencing']"
 174 
 175 // unfencing in rsc_defaults or any resource
 176 #define XPATH_ENABLE_UNFENCING \
 177     "/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RESOURCES   \
 178     "//" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR                                               \
 179     "|/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RSCCONFIG  \
 180     "/" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR
 181 
 182 static void
 183 set_if_xpath(uint64_t flag, const char *xpath, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 184 {
 185     xmlXPathObjectPtr result = NULL;
 186 
 187     if (!pcmk_is_set(data_set->flags, flag)) {
 188         result = xpath_search(data_set->input, xpath);
 189         if (result && (numXpathResults(result) > 0)) {
 190             pe__set_working_set_flags(data_set, flag);
 191         }
 192         freeXpathObject(result);
 193     }
 194 }
 195 
 196 gboolean
 197 unpack_config(xmlNode * config, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 198 {
 199     const char *value = NULL;
 200     GHashTable *config_hash = pcmk__strkey_table(free, free);
 201 
 202     pe_rule_eval_data_t rule_data = {
 203         .node_hash = NULL,
 204         .role = RSC_ROLE_UNKNOWN,
 205         .now = data_set->now,
 206         .match_data = NULL,
 207         .rsc_data = NULL,
 208         .op_data = NULL
 209     };
 210 
 211     data_set->config_hash = config_hash;
 212 
 213     pe__unpack_dataset_nvpairs(config, XML_CIB_TAG_PROPSET, &rule_data, config_hash,
 214                                CIB_OPTIONS_FIRST, FALSE, data_set);
 215 
 216     verify_pe_options(data_set->config_hash);
 217 
 218     set_config_flag(data_set, "enable-startup-probes", pe_flag_startup_probes);
 219     if (!pcmk_is_set(data_set->flags, pe_flag_startup_probes)) {
 220         crm_info("Startup probes: disabled (dangerous)");
 221     }
 222 
 223     value = pe_pref(data_set->config_hash, XML_ATTR_HAVE_WATCHDOG);
 224     if (value && crm_is_true(value)) {
 225         crm_info("Watchdog-based self-fencing will be performed via SBD if "
 226                  "fencing is required and stonith-watchdog-timeout is nonzero");
 227         pe__set_working_set_flags(data_set, pe_flag_have_stonith_resource);
 228     }
 229 
 230     /* Set certain flags via xpath here, so they can be used before the relevant
 231      * configuration sections are unpacked.
 232      */
 233     set_if_xpath(pe_flag_enable_unfencing, XPATH_ENABLE_UNFENCING, data_set);
 234 
 235     value = pe_pref(data_set->config_hash, "stonith-timeout");
 236     data_set->stonith_timeout = (int) crm_parse_interval_spec(value);
 237     crm_debug("STONITH timeout: %d", data_set->stonith_timeout);
 238 
 239     set_config_flag(data_set, "stonith-enabled", pe_flag_stonith_enabled);
 240     crm_debug("STONITH of failed nodes is %s",
 241               pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)? "enabled" : "disabled");
 242 
 243     data_set->stonith_action = pe_pref(data_set->config_hash, "stonith-action");
 244     if (!strcmp(data_set->stonith_action, "poweroff")) {
 245         pe_warn_once(pe_wo_poweroff,
 246                      "Support for stonith-action of 'poweroff' is deprecated "
 247                      "and will be removed in a future release (use 'off' instead)");
 248         data_set->stonith_action = "off";
 249     }
 250     crm_trace("STONITH will %s nodes", data_set->stonith_action);
 251 
 252     set_config_flag(data_set, "concurrent-fencing", pe_flag_concurrent_fencing);
 253     crm_debug("Concurrent fencing is %s",
 254               pcmk_is_set(data_set->flags, pe_flag_concurrent_fencing)? "enabled" : "disabled");
 255 
 256     value = pe_pref(data_set->config_hash,
 257                     XML_CONFIG_ATTR_PRIORITY_FENCING_DELAY);
 258     if (value) {
 259         data_set->priority_fencing_delay = crm_parse_interval_spec(value) / 1000;
 260         crm_trace("Priority fencing delay is %ds", data_set->priority_fencing_delay);
 261     }
 262 
 263     set_config_flag(data_set, "stop-all-resources", pe_flag_stop_everything);
 264     crm_debug("Stop all active resources: %s",
 265               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_stop_everything)));
 266 
 267     set_config_flag(data_set, "symmetric-cluster", pe_flag_symmetric_cluster);
 268     if (pcmk_is_set(data_set->flags, pe_flag_symmetric_cluster)) {
 269         crm_debug("Cluster is symmetric" " - resources can run anywhere by default");
 270     }
 271 
 272     value = pe_pref(data_set->config_hash, "no-quorum-policy");
 273 
 274     if (pcmk__str_eq(value, "ignore", pcmk__str_casei)) {
 275         data_set->no_quorum_policy = no_quorum_ignore;
 276 
 277     } else if (pcmk__str_eq(value, "freeze", pcmk__str_casei)) {
 278         data_set->no_quorum_policy = no_quorum_freeze;
 279 
 280     } else if (pcmk__str_eq(value, "demote", pcmk__str_casei)) {
 281         data_set->no_quorum_policy = no_quorum_demote;
 282 
 283     } else if (pcmk__str_eq(value, "suicide", pcmk__str_casei)) {
 284         if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 285             int do_panic = 0;
 286 
 287             crm_element_value_int(data_set->input, XML_ATTR_QUORUM_PANIC,
 288                                   &do_panic);
 289             if (do_panic || pcmk_is_set(data_set->flags, pe_flag_have_quorum)) {
 290                 data_set->no_quorum_policy = no_quorum_suicide;
 291             } else {
 292                 crm_notice("Resetting no-quorum-policy to 'stop': cluster has never had quorum");
 293                 data_set->no_quorum_policy = no_quorum_stop;
 294             }
 295         } else {
 296             pcmk__config_err("Resetting no-quorum-policy to 'stop' because "
 297                              "fencing is disabled");
 298             data_set->no_quorum_policy = no_quorum_stop;
 299         }
 300 
 301     } else {
 302         data_set->no_quorum_policy = no_quorum_stop;
 303     }
 304 
 305     switch (data_set->no_quorum_policy) {
 306         case no_quorum_freeze:
 307             crm_debug("On loss of quorum: Freeze resources");
 308             break;
 309         case no_quorum_stop:
 310             crm_debug("On loss of quorum: Stop ALL resources");
 311             break;
 312         case no_quorum_demote:
 313             crm_debug("On loss of quorum: "
 314                       "Demote promotable resources and stop other resources");
 315             break;
 316         case no_quorum_suicide:
 317             crm_notice("On loss of quorum: Fence all remaining nodes");
 318             break;
 319         case no_quorum_ignore:
 320             crm_notice("On loss of quorum: Ignore");
 321             break;
 322     }
 323 
 324     set_config_flag(data_set, "stop-orphan-resources", pe_flag_stop_rsc_orphans);
 325     crm_trace("Orphan resources are %s",
 326               pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)? "stopped" : "ignored");
 327 
 328     set_config_flag(data_set, "stop-orphan-actions", pe_flag_stop_action_orphans);
 329     crm_trace("Orphan resource actions are %s",
 330               pcmk_is_set(data_set->flags, pe_flag_stop_action_orphans)? "stopped" : "ignored");
 331 
 332     value = pe_pref(data_set->config_hash, "remove-after-stop");
 333     if (value != NULL) {
 334         if (crm_is_true(value)) {
 335             pe__set_working_set_flags(data_set, pe_flag_remove_after_stop);
 336 #ifndef PCMK__COMPAT_2_0
 337             pe_warn_once(pe_wo_remove_after,
 338                          "Support for the remove-after-stop cluster property is"
 339                          " deprecated and will be removed in a future release");
 340 #endif
 341         } else {
 342             pe__clear_working_set_flags(data_set, pe_flag_remove_after_stop);
 343         }
 344     }
 345 
 346     set_config_flag(data_set, "maintenance-mode", pe_flag_maintenance_mode);
 347     crm_trace("Maintenance mode: %s",
 348               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_maintenance_mode)));
 349 
 350     set_config_flag(data_set, "start-failure-is-fatal", pe_flag_start_failure_fatal);
 351     crm_trace("Start failures are %s",
 352               pcmk_is_set(data_set->flags, pe_flag_start_failure_fatal)? "always fatal" : "handled by failcount");
 353 
 354     if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 355         set_config_flag(data_set, "startup-fencing", pe_flag_startup_fencing);
 356     }
 357     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 358         crm_trace("Unseen nodes will be fenced");
 359     } else {
 360         pe_warn_once(pe_wo_blind, "Blind faith: not fencing unseen nodes");
 361     }
 362 
 363     pcmk__score_red = char2score(pe_pref(data_set->config_hash, "node-health-red"));
 364     pcmk__score_green = char2score(pe_pref(data_set->config_hash, "node-health-green"));
 365     pcmk__score_yellow = char2score(pe_pref(data_set->config_hash, "node-health-yellow"));
 366 
 367     crm_debug("Node scores: 'red' = %s, 'yellow' = %s, 'green' = %s",
 368              pe_pref(data_set->config_hash, "node-health-red"),
 369              pe_pref(data_set->config_hash, "node-health-yellow"),
 370              pe_pref(data_set->config_hash, "node-health-green"));
 371 
 372     data_set->placement_strategy = pe_pref(data_set->config_hash, "placement-strategy");
 373     crm_trace("Placement strategy: %s", data_set->placement_strategy);
 374 
 375     set_config_flag(data_set, "shutdown-lock", pe_flag_shutdown_lock);
 376     crm_trace("Resources will%s be locked to cleanly shut down nodes",
 377               (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)? "" : " not"));
 378     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
 379         value = pe_pref(data_set->config_hash,
 380                         XML_CONFIG_ATTR_SHUTDOWN_LOCK_LIMIT);
 381         data_set->shutdown_lock = crm_parse_interval_spec(value) / 1000;
 382         crm_trace("Shutdown locks expire after %us", data_set->shutdown_lock);
 383     }
 384 
 385     return TRUE;
 386 }
 387 
 388 pe_node_t *
 389 pe_create_node(const char *id, const char *uname, const char *type,
     /* [previous][next][first][last][top][bottom][index][help] */
 390                const char *score, pe_working_set_t * data_set)
 391 {
 392     pe_node_t *new_node = NULL;
 393 
 394     if (pe_find_node(data_set->nodes, uname) != NULL) {
 395         pcmk__config_warn("More than one node entry has name '%s'", uname);
 396     }
 397 
 398     new_node = calloc(1, sizeof(pe_node_t));
 399     if (new_node == NULL) {
 400         return NULL;
 401     }
 402 
 403     new_node->weight = char2score(score);
 404     new_node->fixed = FALSE;
 405     new_node->details = calloc(1, sizeof(struct pe_node_shared_s));
 406 
 407     if (new_node->details == NULL) {
 408         free(new_node);
 409         return NULL;
 410     }
 411 
 412     crm_trace("Creating node for entry %s/%s", uname, id);
 413     new_node->details->id = id;
 414     new_node->details->uname = uname;
 415     new_node->details->online = FALSE;
 416     new_node->details->shutdown = FALSE;
 417     new_node->details->rsc_discovery_enabled = TRUE;
 418     new_node->details->running_rsc = NULL;
 419     new_node->details->type = node_ping;
 420 
 421     if (pcmk__str_eq(type, "remote", pcmk__str_casei)) {
 422         new_node->details->type = node_remote;
 423         pe__set_working_set_flags(data_set, pe_flag_have_remote_nodes);
 424     } else if (pcmk__str_eq(type, "member", pcmk__str_null_matches | pcmk__str_casei)) {
 425         new_node->details->type = node_member;
 426     }
 427 
 428     new_node->details->attrs = pcmk__strkey_table(free, free);
 429 
 430     if (pe__is_guest_or_remote_node(new_node)) {
 431         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 432                             strdup("remote"));
 433     } else {
 434         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 435                             strdup("cluster"));
 436     }
 437 
 438     new_node->details->utilization = pcmk__strkey_table(free, free);
 439     new_node->details->digest_cache = pcmk__strkey_table(free,
 440                                                           pe__free_digests);
 441 
 442     data_set->nodes = g_list_insert_sorted(data_set->nodes, new_node, sort_node_uname);
 443     return new_node;
 444 }
 445 
 446 static const char *
 447 expand_remote_rsc_meta(xmlNode *xml_obj, xmlNode *parent, pe_working_set_t *data)
     /* [previous][next][first][last][top][bottom][index][help] */
 448 {
 449     xmlNode *attr_set = NULL;
 450     xmlNode *attr = NULL;
 451 
 452     const char *container_id = ID(xml_obj);
 453     const char *remote_name = NULL;
 454     const char *remote_server = NULL;
 455     const char *remote_port = NULL;
 456     const char *connect_timeout = "60s";
 457     const char *remote_allow_migrate=NULL;
 458     const char *is_managed = NULL;
 459 
 460     for (attr_set = pcmk__xe_first_child(xml_obj); attr_set != NULL;
 461          attr_set = pcmk__xe_next(attr_set)) {
 462 
 463         if (!pcmk__str_eq((const char *)attr_set->name, XML_TAG_META_SETS,
 464                           pcmk__str_casei)) {
 465             continue;
 466         }
 467 
 468         for (attr = pcmk__xe_first_child(attr_set); attr != NULL;
 469              attr = pcmk__xe_next(attr)) {
 470             const char *value = crm_element_value(attr, XML_NVPAIR_ATTR_VALUE);
 471             const char *name = crm_element_value(attr, XML_NVPAIR_ATTR_NAME);
 472 
 473             if (pcmk__str_eq(name, XML_RSC_ATTR_REMOTE_NODE, pcmk__str_casei)) {
 474                 remote_name = value;
 475             } else if (pcmk__str_eq(name, "remote-addr", pcmk__str_casei)) {
 476                 remote_server = value;
 477             } else if (pcmk__str_eq(name, "remote-port", pcmk__str_casei)) {
 478                 remote_port = value;
 479             } else if (pcmk__str_eq(name, "remote-connect-timeout", pcmk__str_casei)) {
 480                 connect_timeout = value;
 481             } else if (pcmk__str_eq(name, "remote-allow-migrate", pcmk__str_casei)) {
 482                 remote_allow_migrate=value;
 483             } else if (pcmk__str_eq(name, XML_RSC_ATTR_MANAGED, pcmk__str_casei)) {
 484                 is_managed = value;
 485             }
 486         }
 487     }
 488 
 489     if (remote_name == NULL) {
 490         return NULL;
 491     }
 492 
 493     if (pe_find_resource(data->resources, remote_name) != NULL) {
 494         return NULL;
 495     }
 496 
 497     pe_create_remote_xml(parent, remote_name, container_id,
 498                          remote_allow_migrate, is_managed,
 499                          connect_timeout, remote_server, remote_port);
 500     return remote_name;
 501 }
 502 
 503 static void
 504 handle_startup_fencing(pe_working_set_t *data_set, pe_node_t *new_node)
     /* [previous][next][first][last][top][bottom][index][help] */
 505 {
 506     if ((new_node->details->type == node_remote) && (new_node->details->remote_rsc == NULL)) {
 507         /* Ignore fencing for remote nodes that don't have a connection resource
 508          * associated with them. This happens when remote node entries get left
 509          * in the nodes section after the connection resource is removed.
 510          */
 511         return;
 512     }
 513 
 514     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 515         // All nodes are unclean until we've seen their status entry
 516         new_node->details->unclean = TRUE;
 517 
 518     } else {
 519         // Blind faith ...
 520         new_node->details->unclean = FALSE;
 521     }
 522 
 523     /* We need to be able to determine if a node's status section
 524      * exists or not separate from whether the node is unclean. */
 525     new_node->details->unseen = TRUE;
 526 }
 527 
 528 gboolean
 529 unpack_nodes(xmlNode * xml_nodes, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 530 {
 531     xmlNode *xml_obj = NULL;
 532     pe_node_t *new_node = NULL;
 533     const char *id = NULL;
 534     const char *uname = NULL;
 535     const char *type = NULL;
 536     const char *score = NULL;
 537 
 538     pe_rule_eval_data_t rule_data = {
 539         .node_hash = NULL,
 540         .role = RSC_ROLE_UNKNOWN,
 541         .now = data_set->now,
 542         .match_data = NULL,
 543         .rsc_data = NULL,
 544         .op_data = NULL
 545     };
 546 
 547     for (xml_obj = pcmk__xe_first_child(xml_nodes); xml_obj != NULL;
 548          xml_obj = pcmk__xe_next(xml_obj)) {
 549 
 550         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_NODE, pcmk__str_none)) {
 551             new_node = NULL;
 552 
 553             id = crm_element_value(xml_obj, XML_ATTR_ID);
 554             uname = crm_element_value(xml_obj, XML_ATTR_UNAME);
 555             type = crm_element_value(xml_obj, XML_ATTR_TYPE);
 556             score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
 557             crm_trace("Processing node %s/%s", uname, id);
 558 
 559             if (id == NULL) {
 560                 pcmk__config_err("Ignoring <" XML_CIB_TAG_NODE
 561                                  "> entry in configuration without id");
 562                 continue;
 563             }
 564             new_node = pe_create_node(id, uname, type, score, data_set);
 565 
 566             if (new_node == NULL) {
 567                 return FALSE;
 568             }
 569 
 570 /*              if(data_set->have_quorum == FALSE */
 571 /*                 && data_set->no_quorum_policy == no_quorum_stop) { */
 572 /*                      /\* start shutting resources down *\/ */
 573 /*                      new_node->weight = -INFINITY; */
 574 /*              } */
 575 
 576             handle_startup_fencing(data_set, new_node);
 577 
 578             add_node_attrs(xml_obj, new_node, FALSE, data_set);
 579             pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_UTILIZATION, &rule_data,
 580                                        new_node->details->utilization, NULL,
 581                                        FALSE, data_set);
 582 
 583             crm_trace("Done with node %s", crm_element_value(xml_obj, XML_ATTR_UNAME));
 584         }
 585     }
 586 
 587     if (data_set->localhost && pe_find_node(data_set->nodes, data_set->localhost) == NULL) {
 588         crm_info("Creating a fake local node");
 589         pe_create_node(data_set->localhost, data_set->localhost, NULL, 0,
 590                        data_set);
 591     }
 592 
 593     return TRUE;
 594 }
 595 
 596 static void
 597 setup_container(pe_resource_t * rsc, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 598 {
 599     const char *container_id = NULL;
 600 
 601     if (rsc->children) {
 602         g_list_foreach(rsc->children, (GFunc) setup_container, data_set);
 603         return;
 604     }
 605 
 606     container_id = g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_CONTAINER);
 607     if (container_id && !pcmk__str_eq(container_id, rsc->id, pcmk__str_casei)) {
 608         pe_resource_t *container = pe_find_resource(data_set->resources, container_id);
 609 
 610         if (container) {
 611             rsc->container = container;
 612             pe__set_resource_flags(container, pe_rsc_is_container);
 613             container->fillers = g_list_append(container->fillers, rsc);
 614             pe_rsc_trace(rsc, "Resource %s's container is %s", rsc->id, container_id);
 615         } else {
 616             pe_err("Resource %s: Unknown resource container (%s)", rsc->id, container_id);
 617         }
 618     }
 619 }
 620 
 621 gboolean
 622 unpack_remote_nodes(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 623 {
 624     xmlNode *xml_obj = NULL;
 625 
 626     /* Create remote nodes and guest nodes from the resource configuration
 627      * before unpacking resources.
 628      */
 629     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 630          xml_obj = pcmk__xe_next(xml_obj)) {
 631 
 632         const char *new_node_id = NULL;
 633 
 634         /* Check for remote nodes, which are defined by ocf:pacemaker:remote
 635          * primitives.
 636          */
 637         if (xml_contains_remote_node(xml_obj)) {
 638             new_node_id = ID(xml_obj);
 639             /* The "pe_find_node" check is here to make sure we don't iterate over
 640              * an expanded node that has already been added to the node list. */
 641             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 642                 crm_trace("Found remote node %s defined by resource %s",
 643                           new_node_id, ID(xml_obj));
 644                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 645                                data_set);
 646             }
 647             continue;
 648         }
 649 
 650         /* Check for guest nodes, which are defined by special meta-attributes
 651          * of a primitive of any type (for example, VirtualDomain or Xen).
 652          */
 653         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_RESOURCE, pcmk__str_none)) {
 654             /* This will add an ocf:pacemaker:remote primitive to the
 655              * configuration for the guest node's connection, to be unpacked
 656              * later.
 657              */
 658             new_node_id = expand_remote_rsc_meta(xml_obj, xml_resources, data_set);
 659             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 660                 crm_trace("Found guest node %s in resource %s",
 661                           new_node_id, ID(xml_obj));
 662                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 663                                data_set);
 664             }
 665             continue;
 666         }
 667 
 668         /* Check for guest nodes inside a group. Clones are currently not
 669          * supported as guest nodes.
 670          */
 671         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_GROUP, pcmk__str_none)) {
 672             xmlNode *xml_obj2 = NULL;
 673             for (xml_obj2 = pcmk__xe_first_child(xml_obj); xml_obj2 != NULL;
 674                  xml_obj2 = pcmk__xe_next(xml_obj2)) {
 675 
 676                 new_node_id = expand_remote_rsc_meta(xml_obj2, xml_resources, data_set);
 677 
 678                 if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 679                     crm_trace("Found guest node %s in resource %s inside group %s",
 680                               new_node_id, ID(xml_obj2), ID(xml_obj));
 681                     pe_create_node(new_node_id, new_node_id, "remote", NULL,
 682                                    data_set);
 683                 }
 684             }
 685         }
 686     }
 687     return TRUE;
 688 }
 689 
 690 /* Call this after all the nodes and resources have been
 691  * unpacked, but before the status section is read.
 692  *
 693  * A remote node's online status is reflected by the state
 694  * of the remote node's connection resource. We need to link
 695  * the remote node to this connection resource so we can have
 696  * easy access to the connection resource during the scheduler calculations.
 697  */
 698 static void
 699 link_rsc2remotenode(pe_working_set_t *data_set, pe_resource_t *new_rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
 700 {
 701     pe_node_t *remote_node = NULL;
 702 
 703     if (new_rsc->is_remote_node == FALSE) {
 704         return;
 705     }
 706 
 707     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 708         /* remote_nodes and remote_resources are not linked in quick location calculations */
 709         return;
 710     }
 711 
 712     remote_node = pe_find_node(data_set->nodes, new_rsc->id);
 713     CRM_CHECK(remote_node != NULL, return;);
 714 
 715     pe_rsc_trace(new_rsc, "Linking remote connection resource %s to node %s",
 716                  new_rsc->id, remote_node->details->uname);
 717     remote_node->details->remote_rsc = new_rsc;
 718 
 719     if (new_rsc->container == NULL) {
 720         /* Handle start-up fencing for remote nodes (as opposed to guest nodes)
 721          * the same as is done for cluster nodes.
 722          */
 723         handle_startup_fencing(data_set, remote_node);
 724 
 725     } else {
 726         /* pe_create_node() marks the new node as "remote" or "cluster"; now
 727          * that we know the node is a guest node, update it correctly.
 728          */
 729         g_hash_table_replace(remote_node->details->attrs, strdup(CRM_ATTR_KIND),
 730                              strdup("container"));
 731     }
 732 }
 733 
 734 static void
 735 destroy_tag(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 736 {
 737     pe_tag_t *tag = data;
 738 
 739     if (tag) {
 740         free(tag->id);
 741         g_list_free_full(tag->refs, free);
 742         free(tag);
 743     }
 744 }
 745 
 746 /*!
 747  * \internal
 748  * \brief Parse configuration XML for resource information
 749  *
 750  * \param[in]     xml_resources  Top of resource configuration XML
 751  * \param[in,out] data_set       Where to put resource information
 752  *
 753  * \return TRUE
 754  *
 755  * \note unpack_remote_nodes() MUST be called before this, so that the nodes can
 756  *       be used when common_unpack() calls resource_location()
 757  */
 758 gboolean
 759 unpack_resources(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 760 {
 761     xmlNode *xml_obj = NULL;
 762     GList *gIter = NULL;
 763 
 764     data_set->template_rsc_sets = pcmk__strkey_table(free, destroy_tag);
 765 
 766     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 767          xml_obj = pcmk__xe_next(xml_obj)) {
 768 
 769         pe_resource_t *new_rsc = NULL;
 770 
 771         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_RSC_TEMPLATE, pcmk__str_none)) {
 772             const char *template_id = ID(xml_obj);
 773 
 774             if (template_id && g_hash_table_lookup_extended(data_set->template_rsc_sets,
 775                                                             template_id, NULL, NULL) == FALSE) {
 776                 /* Record the template's ID for the knowledge of its existence anyway. */
 777                 g_hash_table_insert(data_set->template_rsc_sets, strdup(template_id), NULL);
 778             }
 779             continue;
 780         }
 781 
 782         crm_trace("Beginning unpack... <%s id=%s... >", crm_element_name(xml_obj), ID(xml_obj));
 783         if (common_unpack(xml_obj, &new_rsc, NULL, data_set) && (new_rsc != NULL)) {
 784             data_set->resources = g_list_append(data_set->resources, new_rsc);
 785             pe_rsc_trace(new_rsc, "Added resource %s", new_rsc->id);
 786 
 787         } else {
 788             pcmk__config_err("Ignoring <%s> resource '%s' "
 789                              "because configuration is invalid",
 790                              crm_element_name(xml_obj), crm_str(ID(xml_obj)));
 791             if (new_rsc != NULL && new_rsc->fns != NULL) {
 792                 new_rsc->fns->free(new_rsc);
 793             }
 794         }
 795     }
 796 
 797     for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) {
 798         pe_resource_t *rsc = (pe_resource_t *) gIter->data;
 799 
 800         setup_container(rsc, data_set);
 801         link_rsc2remotenode(data_set, rsc);
 802     }
 803 
 804     data_set->resources = g_list_sort(data_set->resources, sort_rsc_priority);
 805     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 806         /* Ignore */
 807 
 808     } else if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
 809                && !pcmk_is_set(data_set->flags, pe_flag_have_stonith_resource)) {
 810 
 811         pcmk__config_err("Resource start-up disabled since no STONITH resources have been defined");
 812         pcmk__config_err("Either configure some or disable STONITH with the stonith-enabled option");
 813         pcmk__config_err("NOTE: Clusters with shared data need STONITH to ensure data integrity");
 814     }
 815 
 816     return TRUE;
 817 }
 818 
 819 gboolean
 820 unpack_tags(xmlNode * xml_tags, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 821 {
 822     xmlNode *xml_tag = NULL;
 823 
 824     data_set->tags = pcmk__strkey_table(free, destroy_tag);
 825 
 826     for (xml_tag = pcmk__xe_first_child(xml_tags); xml_tag != NULL;
 827          xml_tag = pcmk__xe_next(xml_tag)) {
 828 
 829         xmlNode *xml_obj_ref = NULL;
 830         const char *tag_id = ID(xml_tag);
 831 
 832         if (!pcmk__str_eq((const char *)xml_tag->name, XML_CIB_TAG_TAG, pcmk__str_none)) {
 833             continue;
 834         }
 835 
 836         if (tag_id == NULL) {
 837             pcmk__config_err("Ignoring <%s> without " XML_ATTR_ID,
 838                              crm_element_name(xml_tag));
 839             continue;
 840         }
 841 
 842         for (xml_obj_ref = pcmk__xe_first_child(xml_tag); xml_obj_ref != NULL;
 843              xml_obj_ref = pcmk__xe_next(xml_obj_ref)) {
 844 
 845             const char *obj_ref = ID(xml_obj_ref);
 846 
 847             if (!pcmk__str_eq((const char *)xml_obj_ref->name, XML_CIB_TAG_OBJ_REF, pcmk__str_none)) {
 848                 continue;
 849             }
 850 
 851             if (obj_ref == NULL) {
 852                 pcmk__config_err("Ignoring <%s> for tag '%s' without " XML_ATTR_ID,
 853                                  crm_element_name(xml_obj_ref), tag_id);
 854                 continue;
 855             }
 856 
 857             if (add_tag_ref(data_set->tags, tag_id, obj_ref) == FALSE) {
 858                 return FALSE;
 859             }
 860         }
 861     }
 862 
 863     return TRUE;
 864 }
 865 
 866 /* The ticket state section:
 867  * "/cib/status/tickets/ticket_state" */
 868 static gboolean
 869 unpack_ticket_state(xmlNode * xml_ticket, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 870 {
 871     const char *ticket_id = NULL;
 872     const char *granted = NULL;
 873     const char *last_granted = NULL;
 874     const char *standby = NULL;
 875     xmlAttrPtr xIter = NULL;
 876 
 877     pe_ticket_t *ticket = NULL;
 878 
 879     ticket_id = ID(xml_ticket);
 880     if (pcmk__str_empty(ticket_id)) {
 881         return FALSE;
 882     }
 883 
 884     crm_trace("Processing ticket state for %s", ticket_id);
 885 
 886     ticket = g_hash_table_lookup(data_set->tickets, ticket_id);
 887     if (ticket == NULL) {
 888         ticket = ticket_new(ticket_id, data_set);
 889         if (ticket == NULL) {
 890             return FALSE;
 891         }
 892     }
 893 
 894     for (xIter = xml_ticket->properties; xIter; xIter = xIter->next) {
 895         const char *prop_name = (const char *)xIter->name;
 896         const char *prop_value = crm_element_value(xml_ticket, prop_name);
 897 
 898         if (pcmk__str_eq(prop_name, XML_ATTR_ID, pcmk__str_none)) {
 899             continue;
 900         }
 901         g_hash_table_replace(ticket->state, strdup(prop_name), strdup(prop_value));
 902     }
 903 
 904     granted = g_hash_table_lookup(ticket->state, "granted");
 905     if (granted && crm_is_true(granted)) {
 906         ticket->granted = TRUE;
 907         crm_info("We have ticket '%s'", ticket->id);
 908     } else {
 909         ticket->granted = FALSE;
 910         crm_info("We do not have ticket '%s'", ticket->id);
 911     }
 912 
 913     last_granted = g_hash_table_lookup(ticket->state, "last-granted");
 914     if (last_granted) {
 915         long long last_granted_ll;
 916 
 917         pcmk__scan_ll(last_granted, &last_granted_ll, 0LL);
 918         ticket->last_granted = (time_t) last_granted_ll;
 919     }
 920 
 921     standby = g_hash_table_lookup(ticket->state, "standby");
 922     if (standby && crm_is_true(standby)) {
 923         ticket->standby = TRUE;
 924         if (ticket->granted) {
 925             crm_info("Granted ticket '%s' is in standby-mode", ticket->id);
 926         }
 927     } else {
 928         ticket->standby = FALSE;
 929     }
 930 
 931     crm_trace("Done with ticket state for %s", ticket_id);
 932 
 933     return TRUE;
 934 }
 935 
 936 static gboolean
 937 unpack_tickets_state(xmlNode * xml_tickets, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 938 {
 939     xmlNode *xml_obj = NULL;
 940 
 941     for (xml_obj = pcmk__xe_first_child(xml_tickets); xml_obj != NULL;
 942          xml_obj = pcmk__xe_next(xml_obj)) {
 943 
 944         if (!pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_TICKET_STATE, pcmk__str_none)) {
 945             continue;
 946         }
 947         unpack_ticket_state(xml_obj, data_set);
 948     }
 949 
 950     return TRUE;
 951 }
 952 
 953 static void
 954 unpack_handle_remote_attrs(pe_node_t *this_node, xmlNode *state, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
 955 {
 956     const char *resource_discovery_enabled = NULL;
 957     xmlNode *attrs = NULL;
 958     pe_resource_t *rsc = NULL;
 959 
 960     if (!pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
 961         return;
 962     }
 963 
 964     if ((this_node == NULL) || !pe__is_guest_or_remote_node(this_node)) {
 965         return;
 966     }
 967     crm_trace("Processing remote node id=%s, uname=%s", this_node->details->id, this_node->details->uname);
 968 
 969     pcmk__scan_min_int(crm_element_value(state, XML_NODE_IS_MAINTENANCE),
 970                        &(this_node->details->remote_maintenance), 0);
 971 
 972     rsc = this_node->details->remote_rsc;
 973     if (this_node->details->remote_requires_reset == FALSE) {
 974         this_node->details->unclean = FALSE;
 975         this_node->details->unseen = FALSE;
 976     }
 977     attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
 978     add_node_attrs(attrs, this_node, TRUE, data_set);
 979 
 980     if (pe__shutdown_requested(this_node)) {
 981         crm_info("Node %s is shutting down", this_node->details->uname);
 982         this_node->details->shutdown = TRUE;
 983     }
 984  
 985     if (crm_is_true(pe_node_attribute_raw(this_node, "standby"))) {
 986         crm_info("Node %s is in standby-mode", this_node->details->uname);
 987         this_node->details->standby = TRUE;
 988     }
 989 
 990     if (crm_is_true(pe_node_attribute_raw(this_node, "maintenance")) ||
 991         ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed))) {
 992         crm_info("Node %s is in maintenance-mode", this_node->details->uname);
 993         this_node->details->maintenance = TRUE;
 994     }
 995 
 996     resource_discovery_enabled = pe_node_attribute_raw(this_node, XML_NODE_ATTR_RSC_DISCOVERY);
 997     if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
 998         if (pe__is_remote_node(this_node)
 999             && !pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1000             crm_warn("Ignoring %s attribute on remote node %s because stonith is disabled",
1001                      XML_NODE_ATTR_RSC_DISCOVERY, this_node->details->uname);
1002         } else {
1003             /* This is either a remote node with fencing enabled, or a guest
1004              * node. We don't care whether fencing is enabled when fencing guest
1005              * nodes, because they are "fenced" by recovering their containing
1006              * resource.
1007              */
1008             crm_info("Node %s has resource discovery disabled", this_node->details->uname);
1009             this_node->details->rsc_discovery_enabled = FALSE;
1010         }
1011     }
1012 }
1013 
1014 /*!
1015  * \internal
1016  * \brief Unpack a cluster node's transient attributes
1017  *
1018  * \param[in] state     CIB node state XML
1019  * \param[in] node      Cluster node whose attributes are being unpacked
1020  * \param[in] data_set  Cluster working set
1021  */
1022 static void
1023 unpack_transient_attributes(xmlNode *state, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1024                             pe_working_set_t *data_set)
1025 {
1026     const char *discovery = NULL;
1027     xmlNode *attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
1028 
1029     add_node_attrs(attrs, node, TRUE, data_set);
1030 
1031     if (crm_is_true(pe_node_attribute_raw(node, "standby"))) {
1032         crm_info("Node %s is in standby-mode", node->details->uname);
1033         node->details->standby = TRUE;
1034     }
1035 
1036     if (crm_is_true(pe_node_attribute_raw(node, "maintenance"))) {
1037         crm_info("Node %s is in maintenance-mode", node->details->uname);
1038         node->details->maintenance = TRUE;
1039     }
1040 
1041     discovery = pe_node_attribute_raw(node, XML_NODE_ATTR_RSC_DISCOVERY);
1042     if ((discovery != NULL) && !crm_is_true(discovery)) {
1043         crm_warn("Ignoring %s attribute for node %s because disabling "
1044                  "resource discovery is not allowed for cluster nodes",
1045                  XML_NODE_ATTR_RSC_DISCOVERY, node->details->uname);
1046     }
1047 }
1048 
1049 /*!
1050  * \internal
1051  * \brief Unpack a node state entry (first pass)
1052  *
1053  * Unpack one node state entry from status. This unpacks information from the
1054  * node_state element itself and node attributes inside it, but not the
1055  * resource history inside it. Multiple passes through the status are needed to
1056  * fully unpack everything.
1057  *
1058  * \param[in] state     CIB node state XML
1059  * \param[in] data_set  Cluster working set
1060  */
1061 static void
1062 unpack_node_state(xmlNode *state, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1063 {
1064     const char *id = NULL;
1065     const char *uname = NULL;
1066     pe_node_t *this_node = NULL;
1067 
1068     id = crm_element_value(state, XML_ATTR_ID);
1069     if (id == NULL) {
1070         crm_warn("Ignoring malformed " XML_CIB_TAG_STATE " entry without "
1071                  XML_ATTR_ID);
1072         return;
1073     }
1074 
1075     uname = crm_element_value(state, XML_ATTR_UNAME);
1076     if (uname == NULL) {
1077         crm_warn("Ignoring malformed " XML_CIB_TAG_STATE " entry without "
1078                  XML_ATTR_UNAME);
1079         return;
1080     }
1081 
1082     this_node = pe_find_node_any(data_set->nodes, id, uname);
1083     if (this_node == NULL) {
1084         pcmk__config_warn("Ignoring recorded node state for '%s' because "
1085                           "it is no longer in the configuration", uname);
1086         return;
1087     }
1088 
1089     if (pe__is_guest_or_remote_node(this_node)) {
1090         /* We can't determine the online status of Pacemaker Remote nodes until
1091          * after all resource history has been unpacked. In this first pass, we
1092          * do need to mark whether the node has been fenced, as this plays a
1093          * role during unpacking cluster node resource state.
1094          */
1095         pcmk__scan_min_int(crm_element_value(state, XML_NODE_IS_FENCED),
1096                            &(this_node->details->remote_was_fenced), 0);
1097         return;
1098     }
1099 
1100     unpack_transient_attributes(state, this_node, data_set);
1101 
1102     /* Provisionally mark this cluster node as clean. We have at least seen it
1103      * in the current cluster's lifetime.
1104      */
1105     this_node->details->unclean = FALSE;
1106     this_node->details->unseen = FALSE;
1107 
1108     crm_trace("Determining online status of cluster node %s (id %s)",
1109               this_node->details->uname, id);
1110     determine_online_status(state, this_node, data_set);
1111 
1112     if (!pcmk_is_set(data_set->flags, pe_flag_have_quorum)
1113         && this_node->details->online
1114         && (data_set->no_quorum_policy == no_quorum_suicide)) {
1115         /* Everything else should flow from this automatically
1116          * (at least until the scheduler becomes able to migrate off
1117          * healthy resources)
1118          */
1119         pe_fence_node(data_set, this_node, "cluster does not have quorum",
1120                       FALSE);
1121     }
1122 }
1123 
1124 /*!
1125  * \internal
1126  * \brief Unpack nodes' resource history as much as possible
1127  *
1128  * Unpack as many nodes' resource history as possible in one pass through the
1129  * status. We need to process Pacemaker Remote nodes' connections/containers
1130  * before unpacking their history; the connection/container history will be
1131  * in another node's history, so it might take multiple passes to unpack
1132  * everything.
1133  *
1134  * \param[in] status    CIB XML status section
1135  * \param[in] fence     If true, treat any not-yet-unpacked nodes as unseen
1136  * \param[in] data_set  Cluster working set
1137  *
1138  * \return Standard Pacemaker return code (specifically pcmk_rc_ok if done,
1139  *         or EAGAIN if more unpacking remains to be done)
1140  */
1141 static int
1142 unpack_node_history(xmlNode *status, bool fence, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1143 {
1144     int rc = pcmk_rc_ok;
1145 
1146     // Loop through all node_state entries in CIB status
1147     for (xmlNode *state = first_named_child(status, XML_CIB_TAG_STATE);
1148          state != NULL; state = crm_next_same_xml(state)) {
1149 
1150         const char *id = ID(state);
1151         const char *uname = crm_element_value(state, XML_ATTR_UNAME);
1152         pe_node_t *this_node = NULL;
1153 
1154         if ((id == NULL) || (uname == NULL)) {
1155             // Warning already logged in first pass through status section
1156             crm_trace("Not unpacking resource history from malformed "
1157                       XML_CIB_TAG_STATE " without id and/or uname");
1158             continue;
1159         }
1160 
1161         this_node = pe_find_node_any(data_set->nodes, id, uname);
1162         if (this_node == NULL) {
1163             // Warning already logged in first pass through status section
1164             crm_trace("Not unpacking resource history for node %s because "
1165                       "no longer in configuration", id);
1166             continue;
1167         }
1168 
1169         if (this_node->details->unpacked) {
1170             crm_trace("Not unpacking resource history for node %s because "
1171                       "already unpacked", id);
1172             continue;
1173         }
1174 
1175         if (fence) {
1176             // We're processing all remaining nodes
1177 
1178         } else if (pe__is_guest_node(this_node)) {
1179             /* We can unpack a guest node's history only after we've unpacked
1180              * other resource history to the point that we know that the node's
1181              * connection and containing resource are both up.
1182              */
1183             pe_resource_t *rsc = this_node->details->remote_rsc;
1184 
1185             if ((rsc == NULL) || (rsc->role != RSC_ROLE_STARTED)
1186                 || (rsc->container->role != RSC_ROLE_STARTED)) {
1187                 crm_trace("Not unpacking resource history for guest node %s "
1188                           "because container and connection are not known to "
1189                           "be up", id);
1190                 continue;
1191             }
1192 
1193         } else if (pe__is_remote_node(this_node)) {
1194             /* We can unpack a remote node's history only after we've unpacked
1195              * other resource history to the point that we know that the node's
1196              * connection is up, with the exception of when shutdown locks are
1197              * in use.
1198              */
1199             pe_resource_t *rsc = this_node->details->remote_rsc;
1200 
1201             if ((rsc == NULL)
1202                 || (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)
1203                     && (rsc->role != RSC_ROLE_STARTED))) {
1204                 crm_trace("Not unpacking resource history for remote node %s "
1205                           "because connection is not known to be up", id);
1206                 continue;
1207             }
1208 
1209         /* If fencing and shutdown locks are disabled and we're not processing
1210          * unseen nodes, then we don't want to unpack offline nodes until online
1211          * nodes have been unpacked. This allows us to number active clone
1212          * instances first.
1213          */
1214         } else if (!pcmk_any_flags_set(data_set->flags, pe_flag_stonith_enabled
1215                                                         |pe_flag_shutdown_lock)
1216                    && !this_node->details->online) {
1217             crm_trace("Not unpacking resource history for offline "
1218                       "cluster node %s", id);
1219             continue;
1220         }
1221 
1222         if (pe__is_guest_or_remote_node(this_node)) {
1223             determine_remote_online_status(data_set, this_node);
1224             unpack_handle_remote_attrs(this_node, state, data_set);
1225         }
1226 
1227         crm_trace("Unpacking resource history for %snode %s",
1228                   (fence? "unseen " : ""), id);
1229 
1230         this_node->details->unpacked = TRUE;
1231         unpack_node_lrm(this_node, state, data_set);
1232 
1233         rc = EAGAIN; // Other node histories might depend on this one
1234     }
1235     return rc;
1236 }
1237 
1238 /* remove nodes that are down, stopping */
1239 /* create positive rsc_to_node constraints between resources and the nodes they are running on */
1240 /* anything else? */
1241 gboolean
1242 unpack_status(xmlNode * status, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1243 {
1244     xmlNode *state = NULL;
1245 
1246     crm_trace("Beginning unpack");
1247 
1248     if (data_set->tickets == NULL) {
1249         data_set->tickets = pcmk__strkey_table(free, destroy_ticket);
1250     }
1251 
1252     for (state = pcmk__xe_first_child(status); state != NULL;
1253          state = pcmk__xe_next(state)) {
1254 
1255         if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_TICKETS, pcmk__str_none)) {
1256             unpack_tickets_state((xmlNode *) state, data_set);
1257 
1258         } else if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
1259             unpack_node_state(state, data_set);
1260         }
1261     }
1262 
1263     while (unpack_node_history(status, FALSE, data_set) == EAGAIN) {
1264         crm_trace("Another pass through node resource histories is needed");
1265     }
1266 
1267     // Now catch any nodes we didn't see
1268     unpack_node_history(status,
1269                         pcmk_is_set(data_set->flags, pe_flag_stonith_enabled),
1270                         data_set);
1271 
1272     /* Now that we know where resources are, we can schedule stops of containers
1273      * with failed bundle connections
1274      */
1275     if (data_set->stop_needed != NULL) {
1276         for (GList *item = data_set->stop_needed; item; item = item->next) {
1277             pe_resource_t *container = item->data;
1278             pe_node_t *node = pe__current_node(container);
1279 
1280             if (node) {
1281                 stop_action(container, node, FALSE);
1282             }
1283         }
1284         g_list_free(data_set->stop_needed);
1285         data_set->stop_needed = NULL;
1286     }
1287 
1288     /* Now that we know status of all Pacemaker Remote connections and nodes,
1289      * we can stop connections for node shutdowns, and check the online status
1290      * of remote/guest nodes that didn't have any node history to unpack.
1291      */
1292     for (GList *gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) {
1293         pe_node_t *this_node = gIter->data;
1294 
1295         if (!pe__is_guest_or_remote_node(this_node)) {
1296             continue;
1297         }
1298         if (this_node->details->shutdown
1299             && (this_node->details->remote_rsc != NULL)) {
1300             pe__set_next_role(this_node->details->remote_rsc, RSC_ROLE_STOPPED,
1301                               "remote shutdown");
1302         }
1303         if (!this_node->details->unpacked) {
1304             determine_remote_online_status(data_set, this_node);
1305         }
1306     }
1307 
1308     return TRUE;
1309 }
1310 
1311 static gboolean
1312 determine_online_status_no_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1313                                    pe_node_t * this_node)
1314 {
1315     gboolean online = FALSE;
1316     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1317     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1318     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1319     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1320 
1321     if (!crm_is_true(in_cluster)) {
1322         crm_trace("Node is down: in_cluster=%s", crm_str(in_cluster));
1323 
1324     } else if (pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei)) {
1325         if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1326             online = TRUE;
1327         } else {
1328             crm_debug("Node is not ready to run resources: %s", join);
1329         }
1330 
1331     } else if (this_node->details->expected_up == FALSE) {
1332         crm_trace("Controller is down: in_cluster=%s", crm_str(in_cluster));
1333         crm_trace("\tis_peer=%s, join=%s, expected=%s",
1334                   crm_str(is_peer), crm_str(join), crm_str(exp_state));
1335 
1336     } else {
1337         /* mark it unclean */
1338         pe_fence_node(data_set, this_node, "peer is unexpectedly down", FALSE);
1339         crm_info("\tin_cluster=%s, is_peer=%s, join=%s, expected=%s",
1340                  crm_str(in_cluster), crm_str(is_peer), crm_str(join), crm_str(exp_state));
1341     }
1342     return online;
1343 }
1344 
1345 static gboolean
1346 determine_online_status_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1347                                 pe_node_t * this_node)
1348 {
1349     gboolean online = FALSE;
1350     gboolean do_terminate = FALSE;
1351     bool crmd_online = FALSE;
1352     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1353     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1354     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1355     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1356     const char *terminate = pe_node_attribute_raw(this_node, "terminate");
1357 
1358 /*
1359   - XML_NODE_IN_CLUSTER    ::= true|false
1360   - XML_NODE_IS_PEER       ::= online|offline
1361   - XML_NODE_JOIN_STATE    ::= member|down|pending|banned
1362   - XML_NODE_EXPECTED      ::= member|down
1363 */
1364 
1365     if (crm_is_true(terminate)) {
1366         do_terminate = TRUE;
1367 
1368     } else if (terminate != NULL && strlen(terminate) > 0) {
1369         /* could be a time() value */
1370         char t = terminate[0];
1371 
1372         if (t != '0' && isdigit(t)) {
1373             do_terminate = TRUE;
1374         }
1375     }
1376 
1377     crm_trace("%s: in_cluster=%s, is_peer=%s, join=%s, expected=%s, term=%d",
1378               this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
1379               crm_str(join), crm_str(exp_state), do_terminate);
1380 
1381     online = crm_is_true(in_cluster);
1382     crmd_online = pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei);
1383     if (exp_state == NULL) {
1384         exp_state = CRMD_JOINSTATE_DOWN;
1385     }
1386 
1387     if (this_node->details->shutdown) {
1388         crm_debug("%s is shutting down", this_node->details->uname);
1389 
1390         /* Slightly different criteria since we can't shut down a dead peer */
1391         online = crmd_online;
1392 
1393     } else if (in_cluster == NULL) {
1394         pe_fence_node(data_set, this_node, "peer has not been seen by the cluster", FALSE);
1395 
1396     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_NACK, pcmk__str_casei)) {
1397         pe_fence_node(data_set, this_node,
1398                       "peer failed Pacemaker membership criteria", FALSE);
1399 
1400     } else if (do_terminate == FALSE && pcmk__str_eq(exp_state, CRMD_JOINSTATE_DOWN, pcmk__str_casei)) {
1401 
1402         if (crm_is_true(in_cluster) || crmd_online) {
1403             crm_info("- Node %s is not ready to run resources", this_node->details->uname);
1404             this_node->details->standby = TRUE;
1405             this_node->details->pending = TRUE;
1406 
1407         } else {
1408             crm_trace("%s is down or still coming up", this_node->details->uname);
1409         }
1410 
1411     } else if (do_terminate && pcmk__str_eq(join, CRMD_JOINSTATE_DOWN, pcmk__str_casei)
1412                && crm_is_true(in_cluster) == FALSE && !crmd_online) {
1413         crm_info("Node %s was just shot", this_node->details->uname);
1414         online = FALSE;
1415 
1416     } else if (crm_is_true(in_cluster) == FALSE) {
1417         // Consider `priority-fencing-delay` for lost nodes
1418         pe_fence_node(data_set, this_node, "peer is no longer part of the cluster", TRUE);
1419 
1420     } else if (!crmd_online) {
1421         pe_fence_node(data_set, this_node, "peer process is no longer available", FALSE);
1422 
1423         /* Everything is running at this point, now check join state */
1424     } else if (do_terminate) {
1425         pe_fence_node(data_set, this_node, "termination was requested", FALSE);
1426 
1427     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1428         crm_info("Node %s is active", this_node->details->uname);
1429 
1430     } else if (pcmk__strcase_any_of(join, CRMD_JOINSTATE_PENDING, CRMD_JOINSTATE_DOWN, NULL)) {
1431         crm_info("Node %s is not ready to run resources", this_node->details->uname);
1432         this_node->details->standby = TRUE;
1433         this_node->details->pending = TRUE;
1434 
1435     } else {
1436         pe_fence_node(data_set, this_node, "peer was in an unknown state", FALSE);
1437         crm_warn("%s: in-cluster=%s, is-peer=%s, join=%s, expected=%s, term=%d, shutdown=%d",
1438                  this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
1439                  crm_str(join), crm_str(exp_state), do_terminate, this_node->details->shutdown);
1440     }
1441 
1442     return online;
1443 }
1444 
1445 static void
1446 determine_remote_online_status(pe_working_set_t * data_set, pe_node_t * this_node)
     /* [previous][next][first][last][top][bottom][index][help] */
1447 {
1448     pe_resource_t *rsc = this_node->details->remote_rsc;
1449     pe_resource_t *container = NULL;
1450     pe_node_t *host = NULL;
1451 
1452     /* If there is a node state entry for a (former) Pacemaker Remote node
1453      * but no resource creating that node, the node's connection resource will
1454      * be NULL. Consider it an offline remote node in that case.
1455      */
1456     if (rsc == NULL) {
1457         this_node->details->online = FALSE;
1458         goto remote_online_done;
1459     }
1460 
1461     container = rsc->container;
1462 
1463     if (container && pcmk__list_of_1(rsc->running_on)) {
1464         host = rsc->running_on->data;
1465     }
1466 
1467     /* If the resource is currently started, mark it online. */
1468     if (rsc->role == RSC_ROLE_STARTED) {
1469         crm_trace("%s node %s presumed ONLINE because connection resource is started",
1470                   (container? "Guest" : "Remote"), this_node->details->id);
1471         this_node->details->online = TRUE;
1472     }
1473 
1474     /* consider this node shutting down if transitioning start->stop */
1475     if (rsc->role == RSC_ROLE_STARTED && rsc->next_role == RSC_ROLE_STOPPED) {
1476         crm_trace("%s node %s shutting down because connection resource is stopping",
1477                   (container? "Guest" : "Remote"), this_node->details->id);
1478         this_node->details->shutdown = TRUE;
1479     }
1480 
1481     /* Now check all the failure conditions. */
1482     if(container && pcmk_is_set(container->flags, pe_rsc_failed)) {
1483         crm_trace("Guest node %s UNCLEAN because guest resource failed",
1484                   this_node->details->id);
1485         this_node->details->online = FALSE;
1486         this_node->details->remote_requires_reset = TRUE;
1487 
1488     } else if (pcmk_is_set(rsc->flags, pe_rsc_failed)) {
1489         crm_trace("%s node %s OFFLINE because connection resource failed",
1490                   (container? "Guest" : "Remote"), this_node->details->id);
1491         this_node->details->online = FALSE;
1492 
1493     } else if (rsc->role == RSC_ROLE_STOPPED
1494         || (container && container->role == RSC_ROLE_STOPPED)) {
1495 
1496         crm_trace("%s node %s OFFLINE because its resource is stopped",
1497                   (container? "Guest" : "Remote"), this_node->details->id);
1498         this_node->details->online = FALSE;
1499         this_node->details->remote_requires_reset = FALSE;
1500 
1501     } else if (host && (host->details->online == FALSE)
1502                && host->details->unclean) {
1503         crm_trace("Guest node %s UNCLEAN because host is unclean",
1504                   this_node->details->id);
1505         this_node->details->online = FALSE;
1506         this_node->details->remote_requires_reset = TRUE;
1507     }
1508 
1509 remote_online_done:
1510     crm_trace("Remote node %s online=%s",
1511         this_node->details->id, this_node->details->online ? "TRUE" : "FALSE");
1512 }
1513 
1514 static void
1515 determine_online_status(xmlNode * node_state, pe_node_t * this_node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1516 {
1517     gboolean online = FALSE;
1518     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1519 
1520     CRM_CHECK(this_node != NULL, return);
1521 
1522     this_node->details->shutdown = FALSE;
1523     this_node->details->expected_up = FALSE;
1524 
1525     if (pe__shutdown_requested(this_node)) {
1526         this_node->details->shutdown = TRUE;
1527 
1528     } else if (pcmk__str_eq(exp_state, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1529         this_node->details->expected_up = TRUE;
1530     }
1531 
1532     if (this_node->details->type == node_ping) {
1533         this_node->details->unclean = FALSE;
1534         online = FALSE;         /* As far as resource management is concerned,
1535                                  * the node is safely offline.
1536                                  * Anyone caught abusing this logic will be shot
1537                                  */
1538 
1539     } else if (!pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1540         online = determine_online_status_no_fencing(data_set, node_state, this_node);
1541 
1542     } else {
1543         online = determine_online_status_fencing(data_set, node_state, this_node);
1544     }
1545 
1546     if (online) {
1547         this_node->details->online = TRUE;
1548 
1549     } else {
1550         /* remove node from contention */
1551         this_node->fixed = TRUE;
1552         this_node->weight = -INFINITY;
1553     }
1554 
1555     if (online && this_node->details->shutdown) {
1556         /* don't run resources here */
1557         this_node->fixed = TRUE;
1558         this_node->weight = -INFINITY;
1559     }
1560 
1561     if (this_node->details->type == node_ping) {
1562         crm_info("Node %s is not a Pacemaker node", this_node->details->uname);
1563 
1564     } else if (this_node->details->unclean) {
1565         pe_proc_warn("Node %s is unclean", this_node->details->uname);
1566 
1567     } else if (this_node->details->online) {
1568         crm_info("Node %s is %s", this_node->details->uname,
1569                  this_node->details->shutdown ? "shutting down" :
1570                  this_node->details->pending ? "pending" :
1571                  this_node->details->standby ? "standby" :
1572                  this_node->details->maintenance ? "maintenance" : "online");
1573 
1574     } else {
1575         crm_trace("Node %s is offline", this_node->details->uname);
1576     }
1577 }
1578 
1579 /*!
1580  * \internal
1581  * \brief Find the end of a resource's name, excluding any clone suffix
1582  *
1583  * \param[in] id  Resource ID to check
1584  *
1585  * \return Pointer to last character of resource's base name
1586  */
1587 const char *
1588 pe_base_name_end(const char *id)
     /* [previous][next][first][last][top][bottom][index][help] */
1589 {
1590     if (!pcmk__str_empty(id)) {
1591         const char *end = id + strlen(id) - 1;
1592 
1593         for (const char *s = end; s > id; --s) {
1594             switch (*s) {
1595                 case '0':
1596                 case '1':
1597                 case '2':
1598                 case '3':
1599                 case '4':
1600                 case '5':
1601                 case '6':
1602                 case '7':
1603                 case '8':
1604                 case '9':
1605                     break;
1606                 case ':':
1607                     return (s == end)? s : (s - 1);
1608                 default:
1609                     return end;
1610             }
1611         }
1612         return end;
1613     }
1614     return NULL;
1615 }
1616 
1617 /*!
1618  * \internal
1619  * \brief Get a resource name excluding any clone suffix
1620  *
1621  * \param[in] last_rsc_id  Resource ID to check
1622  *
1623  * \return Pointer to newly allocated string with resource's base name
1624  * \note It is the caller's responsibility to free() the result.
1625  *       This asserts on error, so callers can assume result is not NULL.
1626  */
1627 char *
1628 clone_strip(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1629 {
1630     const char *end = pe_base_name_end(last_rsc_id);
1631     char *basename = NULL;
1632 
1633     CRM_ASSERT(end);
1634     basename = strndup(last_rsc_id, end - last_rsc_id + 1);
1635     CRM_ASSERT(basename);
1636     return basename;
1637 }
1638 
1639 /*!
1640  * \internal
1641  * \brief Get the name of the first instance of a cloned resource
1642  *
1643  * \param[in] last_rsc_id  Resource ID to check
1644  *
1645  * \return Pointer to newly allocated string with resource's base name plus :0
1646  * \note It is the caller's responsibility to free() the result.
1647  *       This asserts on error, so callers can assume result is not NULL.
1648  */
1649 char *
1650 clone_zero(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1651 {
1652     const char *end = pe_base_name_end(last_rsc_id);
1653     size_t base_name_len = end - last_rsc_id + 1;
1654     char *zero = NULL;
1655 
1656     CRM_ASSERT(end);
1657     zero = calloc(base_name_len + 3, sizeof(char));
1658     CRM_ASSERT(zero);
1659     memcpy(zero, last_rsc_id, base_name_len);
1660     zero[base_name_len] = ':';
1661     zero[base_name_len + 1] = '0';
1662     return zero;
1663 }
1664 
1665 static pe_resource_t *
1666 create_fake_resource(const char *rsc_id, xmlNode * rsc_entry, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1667 {
1668     pe_resource_t *rsc = NULL;
1669     xmlNode *xml_rsc = create_xml_node(NULL, XML_CIB_TAG_RESOURCE);
1670 
1671     copy_in_properties(xml_rsc, rsc_entry);
1672     crm_xml_add(xml_rsc, XML_ATTR_ID, rsc_id);
1673     crm_log_xml_debug(xml_rsc, "Orphan resource");
1674 
1675     if (!common_unpack(xml_rsc, &rsc, NULL, data_set)) {
1676         return NULL;
1677     }
1678 
1679     if (xml_contains_remote_node(xml_rsc)) {
1680         pe_node_t *node;
1681 
1682         crm_debug("Detected orphaned remote node %s", rsc_id);
1683         node = pe_find_node(data_set->nodes, rsc_id);
1684         if (node == NULL) {
1685                 node = pe_create_node(rsc_id, rsc_id, "remote", NULL, data_set);
1686         }
1687         link_rsc2remotenode(data_set, rsc);
1688 
1689         if (node) {
1690             crm_trace("Setting node %s as shutting down due to orphaned connection resource", rsc_id);
1691             node->details->shutdown = TRUE;
1692         }
1693     }
1694 
1695     if (crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER)) {
1696         /* This orphaned rsc needs to be mapped to a container. */
1697         crm_trace("Detected orphaned container filler %s", rsc_id);
1698         pe__set_resource_flags(rsc, pe_rsc_orphan_container_filler);
1699     }
1700     pe__set_resource_flags(rsc, pe_rsc_orphan);
1701     data_set->resources = g_list_append(data_set->resources, rsc);
1702     return rsc;
1703 }
1704 
1705 /*!
1706  * \internal
1707  * \brief Create orphan instance for anonymous clone resource history
1708  */
1709 static pe_resource_t *
1710 create_anonymous_orphan(pe_resource_t *parent, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1711                         pe_node_t *node, pe_working_set_t *data_set)
1712 {
1713     pe_resource_t *top = pe__create_clone_child(parent, data_set);
1714 
1715     // find_rsc() because we might be a cloned group
1716     pe_resource_t *orphan = top->fns->find_rsc(top, rsc_id, NULL, pe_find_clone);
1717 
1718     pe_rsc_debug(parent, "Created orphan %s for %s: %s on %s",
1719                  top->id, parent->id, rsc_id, node->details->uname);
1720     return orphan;
1721 }
1722 
1723 /*!
1724  * \internal
1725  * \brief Check a node for an instance of an anonymous clone
1726  *
1727  * Return a child instance of the specified anonymous clone, in order of
1728  * preference: (1) the instance running on the specified node, if any;
1729  * (2) an inactive instance (i.e. within the total of clone-max instances);
1730  * (3) a newly created orphan (i.e. clone-max instances are already active).
1731  *
1732  * \param[in] data_set  Cluster information
1733  * \param[in] node      Node on which to check for instance
1734  * \param[in] parent    Clone to check
1735  * \param[in] rsc_id    Name of cloned resource in history (without instance)
1736  */
1737 static pe_resource_t *
1738 find_anonymous_clone(pe_working_set_t * data_set, pe_node_t * node, pe_resource_t * parent,
     /* [previous][next][first][last][top][bottom][index][help] */
1739                      const char *rsc_id)
1740 {
1741     GList *rIter = NULL;
1742     pe_resource_t *rsc = NULL;
1743     pe_resource_t *inactive_instance = NULL;
1744     gboolean skip_inactive = FALSE;
1745 
1746     CRM_ASSERT(parent != NULL);
1747     CRM_ASSERT(pe_rsc_is_clone(parent));
1748     CRM_ASSERT(!pcmk_is_set(parent->flags, pe_rsc_unique));
1749 
1750     // Check for active (or partially active, for cloned groups) instance
1751     pe_rsc_trace(parent, "Looking for %s on %s in %s", rsc_id, node->details->uname, parent->id);
1752     for (rIter = parent->children; rsc == NULL && rIter; rIter = rIter->next) {
1753         GList *locations = NULL;
1754         pe_resource_t *child = rIter->data;
1755 
1756         /* Check whether this instance is already known to be active or pending
1757          * anywhere, at this stage of unpacking. Because this function is called
1758          * for a resource before the resource's individual operation history
1759          * entries are unpacked, locations will generally not contain the
1760          * desired node.
1761          *
1762          * However, there are three exceptions:
1763          * (1) when child is a cloned group and we have already unpacked the
1764          *     history of another member of the group on the same node;
1765          * (2) when we've already unpacked the history of another numbered
1766          *     instance on the same node (which can happen if globally-unique
1767          *     was flipped from true to false); and
1768          * (3) when we re-run calculations on the same data set as part of a
1769          *     simulation.
1770          */
1771         child->fns->location(child, &locations, 2);
1772         if (locations) {
1773             /* We should never associate the same numbered anonymous clone
1774              * instance with multiple nodes, and clone instances can't migrate,
1775              * so there must be only one location, regardless of history.
1776              */
1777             CRM_LOG_ASSERT(locations->next == NULL);
1778 
1779             if (((pe_node_t *)locations->data)->details == node->details) {
1780                 /* This child instance is active on the requested node, so check
1781                  * for a corresponding configured resource. We use find_rsc()
1782                  * instead of child because child may be a cloned group, and we
1783                  * need the particular member corresponding to rsc_id.
1784                  *
1785                  * If the history entry is orphaned, rsc will be NULL.
1786                  */
1787                 rsc = parent->fns->find_rsc(child, rsc_id, NULL, pe_find_clone);
1788                 if (rsc) {
1789                     /* If there are multiple instance history entries for an
1790                      * anonymous clone in a single node's history (which can
1791                      * happen if globally-unique is switched from true to
1792                      * false), we want to consider the instances beyond the
1793                      * first as orphans, even if there are inactive instance
1794                      * numbers available.
1795                      */
1796                     if (rsc->running_on) {
1797                         crm_notice("Active (now-)anonymous clone %s has "
1798                                    "multiple (orphan) instance histories on %s",
1799                                    parent->id, node->details->uname);
1800                         skip_inactive = TRUE;
1801                         rsc = NULL;
1802                     } else {
1803                         pe_rsc_trace(parent, "Resource %s, active", rsc->id);
1804                     }
1805                 }
1806             }
1807             g_list_free(locations);
1808 
1809         } else {
1810             pe_rsc_trace(parent, "Resource %s, skip inactive", child->id);
1811             if (!skip_inactive && !inactive_instance
1812                 && !pcmk_is_set(child->flags, pe_rsc_block)) {
1813                 // Remember one inactive instance in case we don't find active
1814                 inactive_instance = parent->fns->find_rsc(child, rsc_id, NULL,
1815                                                           pe_find_clone);
1816 
1817                 /* ... but don't use it if it was already associated with a
1818                  * pending action on another node
1819                  */
1820                 if (inactive_instance && inactive_instance->pending_node
1821                     && (inactive_instance->pending_node->details != node->details)) {
1822                     inactive_instance = NULL;
1823                 }
1824             }
1825         }
1826     }
1827 
1828     if ((rsc == NULL) && !skip_inactive && (inactive_instance != NULL)) {
1829         pe_rsc_trace(parent, "Resource %s, empty slot", inactive_instance->id);
1830         rsc = inactive_instance;
1831     }
1832 
1833     /* If the resource has "requires" set to "quorum" or "nothing", and we don't
1834      * have a clone instance for every node, we don't want to consume a valid
1835      * instance number for unclean nodes. Such instances may appear to be active
1836      * according to the history, but should be considered inactive, so we can
1837      * start an instance elsewhere. Treat such instances as orphans.
1838      *
1839      * An exception is instances running on guest nodes -- since guest node
1840      * "fencing" is actually just a resource stop, requires shouldn't apply.
1841      *
1842      * @TODO Ideally, we'd use an inactive instance number if it is not needed
1843      * for any clean instances. However, we don't know that at this point.
1844      */
1845     if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_needs_fencing)
1846         && (!node->details->online || node->details->unclean)
1847         && !pe__is_guest_node(node)
1848         && !pe__is_universal_clone(parent, data_set)) {
1849 
1850         rsc = NULL;
1851     }
1852 
1853     if (rsc == NULL) {
1854         rsc = create_anonymous_orphan(parent, rsc_id, node, data_set);
1855         pe_rsc_trace(parent, "Resource %s, orphan", rsc->id);
1856     }
1857     return rsc;
1858 }
1859 
1860 static pe_resource_t *
1861 unpack_find_resource(pe_working_set_t * data_set, pe_node_t * node, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1862                      xmlNode * rsc_entry)
1863 {
1864     pe_resource_t *rsc = NULL;
1865     pe_resource_t *parent = NULL;
1866 
1867     crm_trace("looking for %s", rsc_id);
1868     rsc = pe_find_resource(data_set->resources, rsc_id);
1869 
1870     if (rsc == NULL) {
1871         /* If we didn't find the resource by its name in the operation history,
1872          * check it again as a clone instance. Even when clone-max=0, we create
1873          * a single :0 orphan to match against here.
1874          */
1875         char *clone0_id = clone_zero(rsc_id);
1876         pe_resource_t *clone0 = pe_find_resource(data_set->resources, clone0_id);
1877 
1878         if (clone0 && !pcmk_is_set(clone0->flags, pe_rsc_unique)) {
1879             rsc = clone0;
1880             parent = uber_parent(clone0);
1881             crm_trace("%s found as %s (%s)", rsc_id, clone0_id, parent->id);
1882         } else {
1883             crm_trace("%s is not known as %s either (orphan)",
1884                       rsc_id, clone0_id);
1885         }
1886         free(clone0_id);
1887 
1888     } else if (rsc->variant > pe_native) {
1889         crm_trace("Resource history for %s is orphaned because it is no longer primitive",
1890                   rsc_id);
1891         return NULL;
1892 
1893     } else {
1894         parent = uber_parent(rsc);
1895     }
1896 
1897     if (pe_rsc_is_anon_clone(parent)) {
1898 
1899         if (pe_rsc_is_bundled(parent)) {
1900             rsc = pe__find_bundle_replica(parent->parent, node);
1901         } else {
1902             char *base = clone_strip(rsc_id);
1903 
1904             rsc = find_anonymous_clone(data_set, node, parent, base);
1905             free(base);
1906             CRM_ASSERT(rsc != NULL);
1907         }
1908     }
1909 
1910     if (rsc && !pcmk__str_eq(rsc_id, rsc->id, pcmk__str_casei)
1911         && !pcmk__str_eq(rsc_id, rsc->clone_name, pcmk__str_casei)) {
1912 
1913         free(rsc->clone_name);
1914         rsc->clone_name = strdup(rsc_id);
1915         pe_rsc_debug(rsc, "Internally renamed %s on %s to %s%s",
1916                      rsc_id, node->details->uname, rsc->id,
1917                      (pcmk_is_set(rsc->flags, pe_rsc_orphan)? " (ORPHAN)" : ""));
1918     }
1919     return rsc;
1920 }
1921 
1922 static pe_resource_t *
1923 process_orphan_resource(xmlNode * rsc_entry, pe_node_t * node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1924 {
1925     pe_resource_t *rsc = NULL;
1926     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
1927 
1928     crm_debug("Detected orphan resource %s on %s", rsc_id, node->details->uname);
1929     rsc = create_fake_resource(rsc_id, rsc_entry, data_set);
1930     if (rsc == NULL) {
1931         return NULL;
1932     }
1933 
1934     if (!pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)) {
1935         pe__clear_resource_flags(rsc, pe_rsc_managed);
1936 
1937     } else {
1938         CRM_CHECK(rsc != NULL, return NULL);
1939         pe_rsc_trace(rsc, "Added orphan %s", rsc->id);
1940         resource_location(rsc, NULL, -INFINITY, "__orphan_do_not_run__", data_set);
1941     }
1942     return rsc;
1943 }
1944 
1945 static void
1946 process_rsc_state(pe_resource_t * rsc, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
1947                   enum action_fail_response on_fail,
1948                   xmlNode * migrate_op, pe_working_set_t * data_set)
1949 {
1950     pe_node_t *tmpnode = NULL;
1951     char *reason = NULL;
1952     enum action_fail_response save_on_fail = action_fail_ignore;
1953 
1954     CRM_ASSERT(rsc);
1955     pe_rsc_trace(rsc, "Resource %s is %s on %s: on_fail=%s",
1956                  rsc->id, role2text(rsc->role), node->details->uname, fail2text(on_fail));
1957 
1958     /* process current state */
1959     if (rsc->role != RSC_ROLE_UNKNOWN) {
1960         pe_resource_t *iter = rsc;
1961 
1962         while (iter) {
1963             if (g_hash_table_lookup(iter->known_on, node->details->id) == NULL) {
1964                 pe_node_t *n = pe__copy_node(node);
1965 
1966                 pe_rsc_trace(rsc, "%s (aka. %s) known on %s", rsc->id, rsc->clone_name,
1967                              n->details->uname);
1968                 g_hash_table_insert(iter->known_on, (gpointer) n->details->id, n);
1969             }
1970             if (pcmk_is_set(iter->flags, pe_rsc_unique)) {
1971                 break;
1972             }
1973             iter = iter->parent;
1974         }
1975     }
1976 
1977     /* If a managed resource is believed to be running, but node is down ... */
1978     if (rsc->role > RSC_ROLE_STOPPED
1979         && node->details->online == FALSE
1980         && node->details->maintenance == FALSE
1981         && pcmk_is_set(rsc->flags, pe_rsc_managed)) {
1982 
1983         gboolean should_fence = FALSE;
1984 
1985         /* If this is a guest node, fence it (regardless of whether fencing is
1986          * enabled, because guest node fencing is done by recovery of the
1987          * container resource rather than by the fencer). Mark the resource
1988          * we're processing as failed. When the guest comes back up, its
1989          * operation history in the CIB will be cleared, freeing the affected
1990          * resource to run again once we are sure we know its state.
1991          */
1992         if (pe__is_guest_node(node)) {
1993             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
1994             should_fence = TRUE;
1995 
1996         } else if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1997             if (pe__is_remote_node(node) && node->details->remote_rsc
1998                 && !pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_failed)) {
1999 
2000                 /* Setting unseen means that fencing of the remote node will
2001                  * occur only if the connection resource is not going to start
2002                  * somewhere. This allows connection resources on a failed
2003                  * cluster node to move to another node without requiring the
2004                  * remote nodes to be fenced as well.
2005                  */
2006                 node->details->unseen = TRUE;
2007                 reason = crm_strdup_printf("%s is active there (fencing will be"
2008                                            " revoked if remote connection can "
2009                                            "be re-established elsewhere)",
2010                                            rsc->id);
2011             }
2012             should_fence = TRUE;
2013         }
2014 
2015         if (should_fence) {
2016             if (reason == NULL) {
2017                reason = crm_strdup_printf("%s is thought to be active there", rsc->id);
2018             }
2019             pe_fence_node(data_set, node, reason, FALSE);
2020         }
2021         free(reason);
2022     }
2023 
2024     /* In order to calculate priority_fencing_delay correctly, save the failure information and pass it to native_add_running(). */
2025     save_on_fail = on_fail;
2026 
2027     if (node->details->unclean) {
2028         /* No extra processing needed
2029          * Also allows resources to be started again after a node is shot
2030          */
2031         on_fail = action_fail_ignore;
2032     }
2033 
2034     switch (on_fail) {
2035         case action_fail_ignore:
2036             /* nothing to do */
2037             break;
2038 
2039         case action_fail_demote:
2040             pe__set_resource_flags(rsc, pe_rsc_failed);
2041             demote_action(rsc, node, FALSE);
2042             break;
2043 
2044         case action_fail_fence:
2045             /* treat it as if it is still running
2046              * but also mark the node as unclean
2047              */
2048             reason = crm_strdup_printf("%s failed there", rsc->id);
2049             pe_fence_node(data_set, node, reason, FALSE);
2050             free(reason);
2051             break;
2052 
2053         case action_fail_standby:
2054             node->details->standby = TRUE;
2055             node->details->standby_onfail = TRUE;
2056             break;
2057 
2058         case action_fail_block:
2059             /* is_managed == FALSE will prevent any
2060              * actions being sent for the resource
2061              */
2062             pe__clear_resource_flags(rsc, pe_rsc_managed);
2063             pe__set_resource_flags(rsc, pe_rsc_block);
2064             break;
2065 
2066         case action_fail_migrate:
2067             /* make sure it comes up somewhere else
2068              * or not at all
2069              */
2070             resource_location(rsc, node, -INFINITY, "__action_migration_auto__", data_set);
2071             break;
2072 
2073         case action_fail_stop:
2074             pe__set_next_role(rsc, RSC_ROLE_STOPPED, "on-fail=stop");
2075             break;
2076 
2077         case action_fail_recover:
2078             if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2079                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2080                 stop_action(rsc, node, FALSE);
2081             }
2082             break;
2083 
2084         case action_fail_restart_container:
2085             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2086             if (rsc->container && pe_rsc_is_bundled(rsc)) {
2087                 /* A bundle's remote connection can run on a different node than
2088                  * the bundle's container. We don't necessarily know where the
2089                  * container is running yet, so remember it and add a stop
2090                  * action for it later.
2091                  */
2092                 data_set->stop_needed = g_list_prepend(data_set->stop_needed,
2093                                                        rsc->container);
2094             } else if (rsc->container) {
2095                 stop_action(rsc->container, node, FALSE);
2096             } else if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2097                 stop_action(rsc, node, FALSE);
2098             }
2099             break;
2100 
2101         case action_fail_reset_remote:
2102             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2103             if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
2104                 tmpnode = NULL;
2105                 if (rsc->is_remote_node) {
2106                     tmpnode = pe_find_node(data_set->nodes, rsc->id);
2107                 }
2108                 if (tmpnode &&
2109                     pe__is_remote_node(tmpnode) &&
2110                     tmpnode->details->remote_was_fenced == 0) {
2111 
2112                     /* The remote connection resource failed in a way that
2113                      * should result in fencing the remote node.
2114                      */
2115                     pe_fence_node(data_set, tmpnode,
2116                                   "remote connection is unrecoverable", FALSE);
2117                 }
2118             }
2119 
2120             /* require the stop action regardless if fencing is occurring or not. */
2121             if (rsc->role > RSC_ROLE_STOPPED) {
2122                 stop_action(rsc, node, FALSE);
2123             }
2124 
2125             /* if reconnect delay is in use, prevent the connection from exiting the
2126              * "STOPPED" role until the failure is cleared by the delay timeout. */
2127             if (rsc->remote_reconnect_ms) {
2128                 pe__set_next_role(rsc, RSC_ROLE_STOPPED, "remote reset");
2129             }
2130             break;
2131     }
2132 
2133     /* ensure a remote-node connection failure forces an unclean remote-node
2134      * to be fenced. By setting unseen = FALSE, the remote-node failure will
2135      * result in a fencing operation regardless if we're going to attempt to 
2136      * reconnect to the remote-node in this transition or not. */
2137     if (pcmk_is_set(rsc->flags, pe_rsc_failed) && rsc->is_remote_node) {
2138         tmpnode = pe_find_node(data_set->nodes, rsc->id);
2139         if (tmpnode && tmpnode->details->unclean) {
2140             tmpnode->details->unseen = FALSE;
2141         }
2142     }
2143 
2144     if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2145         if (pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
2146             if (pcmk_is_set(rsc->flags, pe_rsc_managed)) {
2147                 pcmk__config_warn("Detected active orphan %s running on %s",
2148                                   rsc->id, node->details->uname);
2149             } else {
2150                 pcmk__config_warn("Resource '%s' must be stopped manually on "
2151                                   "%s because cluster is configured not to "
2152                                   "stop active orphans",
2153                                   rsc->id, node->details->uname);
2154             }
2155         }
2156 
2157         native_add_running(rsc, node, data_set, (save_on_fail != action_fail_ignore));
2158         switch (on_fail) {
2159             case action_fail_ignore:
2160                 break;
2161             case action_fail_demote:
2162             case action_fail_block:
2163                 pe__set_resource_flags(rsc, pe_rsc_failed);
2164                 break;
2165             default:
2166                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2167                 break;
2168         }
2169 
2170     } else if (rsc->clone_name && strchr(rsc->clone_name, ':') != NULL) {
2171         /* Only do this for older status sections that included instance numbers
2172          * Otherwise stopped instances will appear as orphans
2173          */
2174         pe_rsc_trace(rsc, "Resetting clone_name %s for %s (stopped)", rsc->clone_name, rsc->id);
2175         free(rsc->clone_name);
2176         rsc->clone_name = NULL;
2177 
2178     } else {
2179         GList *possible_matches = pe__resource_actions(rsc, node, RSC_STOP,
2180                                                        FALSE);
2181         GList *gIter = possible_matches;
2182 
2183         for (; gIter != NULL; gIter = gIter->next) {
2184             pe_action_t *stop = (pe_action_t *) gIter->data;
2185 
2186             pe__set_action_flags(stop, pe_action_optional);
2187         }
2188 
2189         g_list_free(possible_matches);
2190     }
2191 }
2192 
2193 /* create active recurring operations as optional */
2194 static void
2195 process_recurring(pe_node_t * node, pe_resource_t * rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
2196                   int start_index, int stop_index,
2197                   GList *sorted_op_list, pe_working_set_t * data_set)
2198 {
2199     int counter = -1;
2200     const char *task = NULL;
2201     const char *status = NULL;
2202     GList *gIter = sorted_op_list;
2203 
2204     CRM_ASSERT(rsc);
2205     pe_rsc_trace(rsc, "%s: Start index %d, stop index = %d", rsc->id, start_index, stop_index);
2206 
2207     for (; gIter != NULL; gIter = gIter->next) {
2208         xmlNode *rsc_op = (xmlNode *) gIter->data;
2209 
2210         guint interval_ms = 0;
2211         char *key = NULL;
2212         const char *id = ID(rsc_op);
2213 
2214         counter++;
2215 
2216         if (node->details->online == FALSE) {
2217             pe_rsc_trace(rsc, "Skipping %s/%s: node is offline", rsc->id, node->details->uname);
2218             break;
2219 
2220             /* Need to check if there's a monitor for role="Stopped" */
2221         } else if (start_index < stop_index && counter <= stop_index) {
2222             pe_rsc_trace(rsc, "Skipping %s/%s: resource is not active", id, node->details->uname);
2223             continue;
2224 
2225         } else if (counter < start_index) {
2226             pe_rsc_trace(rsc, "Skipping %s/%s: old %d", id, node->details->uname, counter);
2227             continue;
2228         }
2229 
2230         crm_element_value_ms(rsc_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
2231         if (interval_ms == 0) {
2232             pe_rsc_trace(rsc, "Skipping %s/%s: non-recurring", id, node->details->uname);
2233             continue;
2234         }
2235 
2236         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2237         if (pcmk__str_eq(status, "-1", pcmk__str_casei)) {
2238             pe_rsc_trace(rsc, "Skipping %s/%s: status", id, node->details->uname);
2239             continue;
2240         }
2241         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2242         /* create the action */
2243         key = pcmk__op_key(rsc->id, task, interval_ms);
2244         pe_rsc_trace(rsc, "Creating %s/%s", key, node->details->uname);
2245         custom_action(rsc, key, task, node, TRUE, TRUE, data_set);
2246     }
2247 }
2248 
2249 void
2250 calculate_active_ops(GList *sorted_op_list, int *start_index, int *stop_index)
     /* [previous][next][first][last][top][bottom][index][help] */
2251 {
2252     int counter = -1;
2253     int implied_monitor_start = -1;
2254     int implied_clone_start = -1;
2255     const char *task = NULL;
2256     const char *status = NULL;
2257     GList *gIter = sorted_op_list;
2258 
2259     *stop_index = -1;
2260     *start_index = -1;
2261 
2262     for (; gIter != NULL; gIter = gIter->next) {
2263         xmlNode *rsc_op = (xmlNode *) gIter->data;
2264 
2265         counter++;
2266 
2267         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2268         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2269 
2270         if (pcmk__str_eq(task, CRMD_ACTION_STOP, pcmk__str_casei)
2271             && pcmk__str_eq(status, "0", pcmk__str_casei)) {
2272             *stop_index = counter;
2273 
2274         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_START, CRMD_ACTION_MIGRATED, NULL)) {
2275             *start_index = counter;
2276 
2277         } else if ((implied_monitor_start <= *stop_index) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
2278             const char *rc = crm_element_value(rsc_op, XML_LRM_ATTR_RC);
2279 
2280             if (pcmk__strcase_any_of(rc, "0", "8", NULL)) {
2281                 implied_monitor_start = counter;
2282             }
2283         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_PROMOTE, CRMD_ACTION_DEMOTE, NULL)) {
2284             implied_clone_start = counter;
2285         }
2286     }
2287 
2288     if (*start_index == -1) {
2289         if (implied_clone_start != -1) {
2290             *start_index = implied_clone_start;
2291         } else if (implied_monitor_start != -1) {
2292             *start_index = implied_monitor_start;
2293         }
2294     }
2295 }
2296 
2297 // If resource history entry has shutdown lock, remember lock node and time
2298 static void
2299 unpack_shutdown_lock(xmlNode *rsc_entry, pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2300                      pe_working_set_t *data_set)
2301 {
2302     time_t lock_time = 0;   // When lock started (i.e. node shutdown time)
2303 
2304     if ((crm_element_value_epoch(rsc_entry, XML_CONFIG_ATTR_SHUTDOWN_LOCK,
2305                                  &lock_time) == pcmk_ok) && (lock_time != 0)) {
2306 
2307         if ((data_set->shutdown_lock > 0)
2308             && (get_effective_time(data_set)
2309                 > (lock_time + data_set->shutdown_lock))) {
2310             pe_rsc_info(rsc, "Shutdown lock for %s on %s expired",
2311                         rsc->id, node->details->uname);
2312             pe__clear_resource_history(rsc, node, data_set);
2313         } else {
2314             rsc->lock_node = node;
2315             rsc->lock_time = lock_time;
2316         }
2317     }
2318 }
2319 
2320 /*!
2321  * \internal
2322  * \brief Unpack one lrm_resource entry from a node's CIB status
2323  *
2324  * \param[in] node       Node whose status is being unpacked
2325  * \param[in] rsc_entry  lrm_resource XML being unpacked
2326  * \param[in] data_set   Cluster working set
2327  *
2328  * \return Resource corresponding to the entry, or NULL if no operation history
2329  */
2330 static pe_resource_t *
2331 unpack_lrm_resource(pe_node_t *node, xmlNode *lrm_resource,
     /* [previous][next][first][last][top][bottom][index][help] */
2332                     pe_working_set_t *data_set)
2333 {
2334     GList *gIter = NULL;
2335     int stop_index = -1;
2336     int start_index = -1;
2337     enum rsc_role_e req_role = RSC_ROLE_UNKNOWN;
2338 
2339     const char *task = NULL;
2340     const char *rsc_id = ID(lrm_resource);
2341 
2342     pe_resource_t *rsc = NULL;
2343     GList *op_list = NULL;
2344     GList *sorted_op_list = NULL;
2345 
2346     xmlNode *migrate_op = NULL;
2347     xmlNode *rsc_op = NULL;
2348     xmlNode *last_failure = NULL;
2349 
2350     enum action_fail_response on_fail = action_fail_ignore;
2351     enum rsc_role_e saved_role = RSC_ROLE_UNKNOWN;
2352 
2353     if (rsc_id == NULL) {
2354         crm_warn("Ignoring malformed " XML_LRM_TAG_RESOURCE
2355                  " entry without id");
2356         return NULL;
2357     }
2358     crm_trace("Unpacking " XML_LRM_TAG_RESOURCE " for %s on %s",
2359               rsc_id, node->details->uname);
2360 
2361     // Build a list of individual lrm_rsc_op entries, so we can sort them
2362     for (rsc_op = first_named_child(lrm_resource, XML_LRM_TAG_RSC_OP);
2363          rsc_op != NULL; rsc_op = crm_next_same_xml(rsc_op)) {
2364 
2365         op_list = g_list_prepend(op_list, rsc_op);
2366     }
2367 
2368     if (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2369         if (op_list == NULL) {
2370             // If there are no operations, there is nothing to do
2371             return NULL;
2372         }
2373     }
2374 
2375     /* find the resource */
2376     rsc = unpack_find_resource(data_set, node, rsc_id, lrm_resource);
2377     if (rsc == NULL) {
2378         if (op_list == NULL) {
2379             // If there are no operations, there is nothing to do
2380             return NULL;
2381         } else {
2382             rsc = process_orphan_resource(lrm_resource, node, data_set);
2383         }
2384     }
2385     CRM_ASSERT(rsc != NULL);
2386 
2387     // Check whether the resource is "shutdown-locked" to this node
2388     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2389         unpack_shutdown_lock(lrm_resource, rsc, node, data_set);
2390     }
2391 
2392     /* process operations */
2393     saved_role = rsc->role;
2394     rsc->role = RSC_ROLE_UNKNOWN;
2395     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
2396 
2397     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
2398         xmlNode *rsc_op = (xmlNode *) gIter->data;
2399 
2400         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2401         if (pcmk__str_eq(task, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
2402             migrate_op = rsc_op;
2403         }
2404 
2405         unpack_rsc_op(rsc, node, rsc_op, &last_failure, &on_fail, data_set);
2406     }
2407 
2408     /* create active recurring operations as optional */
2409     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
2410     process_recurring(node, rsc, start_index, stop_index, sorted_op_list, data_set);
2411 
2412     /* no need to free the contents */
2413     g_list_free(sorted_op_list);
2414 
2415     process_rsc_state(rsc, node, on_fail, migrate_op, data_set);
2416 
2417     if (get_target_role(rsc, &req_role)) {
2418         if (rsc->next_role == RSC_ROLE_UNKNOWN || req_role < rsc->next_role) {
2419             pe__set_next_role(rsc, req_role, XML_RSC_ATTR_TARGET_ROLE);
2420 
2421         } else if (req_role > rsc->next_role) {
2422             pe_rsc_info(rsc, "%s: Not overwriting calculated next role %s"
2423                         " with requested next role %s",
2424                         rsc->id, role2text(rsc->next_role), role2text(req_role));
2425         }
2426     }
2427 
2428     if (saved_role > rsc->role) {
2429         rsc->role = saved_role;
2430     }
2431 
2432     return rsc;
2433 }
2434 
2435 static void
2436 handle_orphaned_container_fillers(xmlNode * lrm_rsc_list, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2437 {
2438     xmlNode *rsc_entry = NULL;
2439     for (rsc_entry = pcmk__xe_first_child(lrm_rsc_list); rsc_entry != NULL;
2440          rsc_entry = pcmk__xe_next(rsc_entry)) {
2441 
2442         pe_resource_t *rsc;
2443         pe_resource_t *container;
2444         const char *rsc_id;
2445         const char *container_id;
2446 
2447         if (!pcmk__str_eq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE, pcmk__str_casei)) {
2448             continue;
2449         }
2450 
2451         container_id = crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER);
2452         rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
2453         if (container_id == NULL || rsc_id == NULL) {
2454             continue;
2455         }
2456 
2457         container = pe_find_resource(data_set->resources, container_id);
2458         if (container == NULL) {
2459             continue;
2460         }
2461 
2462         rsc = pe_find_resource(data_set->resources, rsc_id);
2463         if (rsc == NULL ||
2464             !pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler) ||
2465             rsc->container != NULL) {
2466             continue;
2467         }
2468 
2469         pe_rsc_trace(rsc, "Mapped container of orphaned resource %s to %s",
2470                      rsc->id, container_id);
2471         rsc->container = container;
2472         container->fillers = g_list_append(container->fillers, rsc);
2473     }
2474 }
2475 
2476 /*!
2477  * \internal
2478  * \brief Unpack one node's lrm status section
2479  *
2480  * \param[in] node      Node whose status is being unpacked
2481  * \param[in] xml       CIB node state XML
2482  * \param[in] data_set  Cluster working set
2483  */
2484 static void
2485 unpack_node_lrm(pe_node_t *node, xmlNode *xml, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2486 {
2487     bool found_orphaned_container_filler = false;
2488 
2489     // Drill down to lrm_resources section
2490     xml = find_xml_node(xml, XML_CIB_TAG_LRM, FALSE);
2491     if (xml == NULL) {
2492         return;
2493     }
2494     xml = find_xml_node(xml, XML_LRM_TAG_RESOURCES, FALSE);
2495     if (xml == NULL) {
2496         return;
2497     }
2498 
2499     // Unpack each lrm_resource entry
2500     for (xmlNode *rsc_entry = first_named_child(xml, XML_LRM_TAG_RESOURCE);
2501          rsc_entry != NULL; rsc_entry = crm_next_same_xml(rsc_entry)) {
2502 
2503         pe_resource_t *rsc = unpack_lrm_resource(node, rsc_entry, data_set);
2504 
2505         if ((rsc != NULL)
2506             && pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler)) {
2507             found_orphaned_container_filler = true;
2508         }
2509     }
2510 
2511     /* Now that all resource state has been unpacked for this node, map any
2512      * orphaned container fillers to their container resource.
2513      */
2514     if (found_orphaned_container_filler) {
2515         handle_orphaned_container_fillers(xml, data_set);
2516     }
2517 }
2518 
2519 static void
2520 set_active(pe_resource_t * rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
2521 {
2522     pe_resource_t *top = uber_parent(rsc);
2523 
2524     if (top && pcmk_is_set(top->flags, pe_rsc_promotable)) {
2525         rsc->role = RSC_ROLE_UNPROMOTED;
2526     } else {
2527         rsc->role = RSC_ROLE_STARTED;
2528     }
2529 }
2530 
2531 static void
2532 set_node_score(gpointer key, gpointer value, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
2533 {
2534     pe_node_t *node = value;
2535     int *score = user_data;
2536 
2537     node->weight = *score;
2538 }
2539 
2540 #define STATUS_PATH_MAX 1024
2541 static xmlNode *
2542 find_lrm_op(const char *resource, const char *op, const char *node, const char *source,
     /* [previous][next][first][last][top][bottom][index][help] */
2543             bool success_only, pe_working_set_t *data_set)
2544 {
2545     int offset = 0;
2546     char xpath[STATUS_PATH_MAX];
2547     xmlNode *xml = NULL;
2548 
2549     offset += snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//node_state[@uname='%s']", node);
2550     offset +=
2551         snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//" XML_LRM_TAG_RESOURCE "[@id='%s']",
2552                  resource);
2553 
2554     /* Need to check against transition_magic too? */
2555     if (source && pcmk__str_eq(op, CRMD_ACTION_MIGRATE, pcmk__str_casei)) {
2556         offset +=
2557             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2558                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_target='%s']", op,
2559                      source);
2560     } else if (source && pcmk__str_eq(op, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
2561         offset +=
2562             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2563                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_source='%s']", op,
2564                      source);
2565     } else {
2566         offset +=
2567             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2568                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s']", op);
2569     }
2570 
2571     CRM_LOG_ASSERT(offset > 0);
2572     xml = get_xpath_object(xpath, data_set->input, LOG_DEBUG);
2573 
2574     if (xml && success_only) {
2575         int rc = PCMK_OCF_UNKNOWN_ERROR;
2576         int status = PCMK_EXEC_ERROR;
2577 
2578         crm_element_value_int(xml, XML_LRM_ATTR_RC, &rc);
2579         crm_element_value_int(xml, XML_LRM_ATTR_OPSTATUS, &status);
2580         if ((rc != PCMK_OCF_OK) || (status != PCMK_EXEC_DONE)) {
2581             return NULL;
2582         }
2583     }
2584     return xml;
2585 }
2586 
2587 static int
2588 pe__call_id(xmlNode *op_xml)
     /* [previous][next][first][last][top][bottom][index][help] */
2589 {
2590     int id = 0;
2591 
2592     if (op_xml) {
2593         crm_element_value_int(op_xml, XML_LRM_ATTR_CALLID, &id);
2594     }
2595     return id;
2596 }
2597 
2598 /*!
2599  * \brief Check whether a stop happened on the same node after some event
2600  *
2601  * \param[in] rsc       Resource being checked
2602  * \param[in] node      Node being checked
2603  * \param[in] xml_op    Event that stop is being compared to
2604  * \param[in] data_set  Cluster working set
2605  *
2606  * \return TRUE if stop happened after event, FALSE otherwise
2607  *
2608  * \note This is really unnecessary, but kept as a safety mechanism. We
2609  *       currently don't save more than one successful event in history, so this
2610  *       only matters when processing really old CIB files that we don't
2611  *       technically support anymore, or as preparation for logging an extended
2612  *       history in the future.
2613  */
2614 static bool
2615 stop_happened_after(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2616                     pe_working_set_t *data_set)
2617 {
2618     xmlNode *stop_op = find_lrm_op(rsc->id, CRMD_ACTION_STOP,
2619                                    node->details->uname, NULL, TRUE, data_set);
2620 
2621     return (stop_op && (pe__call_id(stop_op) > pe__call_id(xml_op)));
2622 }
2623 
2624 static void
2625 unpack_migrate_to_success(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2626                           pe_working_set_t *data_set)
2627 {
2628     /* A successful migration sequence is:
2629      *    migrate_to on source node
2630      *    migrate_from on target node
2631      *    stop on source node
2632      *
2633      * If a migrate_to is followed by a stop, the entire migration (successful
2634      * or failed) is complete, and we don't care what happened on the target.
2635      *
2636      * If no migrate_from has happened, the migration is considered to be
2637      * "partial". If the migrate_from failed, make sure the resource gets
2638      * stopped on both source and target (if up).
2639      *
2640      * If the migrate_to and migrate_from both succeeded (which also implies the
2641      * resource is no longer running on the source), but there is no stop, the
2642      * migration is considered to be "dangling". Schedule a stop on the source
2643      * in this case.
2644      */
2645     int from_rc = 0;
2646     int from_status = 0;
2647     pe_node_t *target_node = NULL;
2648     pe_node_t *source_node = NULL;
2649     xmlNode *migrate_from = NULL;
2650     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2651     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2652 
2653     // Sanity check
2654     CRM_CHECK(source && target && !strcmp(source, node->details->uname), return);
2655 
2656     if (stop_happened_after(rsc, node, xml_op, data_set)) {
2657         return;
2658     }
2659 
2660     // Clones are not allowed to migrate, so role can't be promoted
2661     rsc->role = RSC_ROLE_STARTED;
2662 
2663     target_node = pe_find_node(data_set->nodes, target);
2664     source_node = pe_find_node(data_set->nodes, source);
2665 
2666     // Check whether there was a migrate_from action on the target
2667     migrate_from = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, target,
2668                                source, FALSE, data_set);
2669     if (migrate_from) {
2670         crm_element_value_int(migrate_from, XML_LRM_ATTR_RC, &from_rc);
2671         crm_element_value_int(migrate_from, XML_LRM_ATTR_OPSTATUS, &from_status);
2672         pe_rsc_trace(rsc, "%s op on %s exited with status=%d, rc=%d",
2673                      ID(migrate_from), target, from_status, from_rc);
2674     }
2675 
2676     if (migrate_from && from_rc == PCMK_OCF_OK
2677         && (from_status == PCMK_EXEC_DONE)) {
2678         /* The migrate_to and migrate_from both succeeded, so mark the migration
2679          * as "dangling". This will be used to schedule a stop action on the
2680          * source without affecting the target.
2681          */
2682         pe_rsc_trace(rsc, "Detected dangling migration op: %s on %s", ID(xml_op),
2683                      source);
2684         rsc->role = RSC_ROLE_STOPPED;
2685         rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
2686 
2687     } else if (migrate_from && (from_status != PCMK_EXEC_PENDING)) { // Failed
2688         if (target_node && target_node->details->online) {
2689             pe_rsc_trace(rsc, "Marking active on %s %p %d", target, target_node,
2690                          target_node->details->online);
2691             native_add_running(rsc, target_node, data_set, TRUE);
2692         }
2693 
2694     } else { // Pending, or complete but erased
2695         if (target_node && target_node->details->online) {
2696             pe_rsc_trace(rsc, "Marking active on %s %p %d", target, target_node,
2697                          target_node->details->online);
2698 
2699             native_add_running(rsc, target_node, data_set, FALSE);
2700             if (source_node && source_node->details->online) {
2701                 /* This is a partial migration: the migrate_to completed
2702                  * successfully on the source, but the migrate_from has not
2703                  * completed. Remember the source and target; if the newly
2704                  * chosen target remains the same when we schedule actions
2705                  * later, we may continue with the migration.
2706                  */
2707                 rsc->partial_migration_target = target_node;
2708                 rsc->partial_migration_source = source_node;
2709             }
2710         } else {
2711             /* Consider it failed here - forces a restart, prevents migration */
2712             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2713             pe__clear_resource_flags(rsc, pe_rsc_allow_migrate);
2714         }
2715     }
2716 }
2717 
2718 // Is there an action_name in node_name's rsc history newer than call_id?
2719 static bool
2720 newer_op(pe_resource_t *rsc, const char *action_name, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2721          int call_id, pe_working_set_t *data_set)
2722 {
2723     xmlNode *action = find_lrm_op(rsc->id, action_name, node_name, NULL, TRUE,
2724                                   data_set);
2725 
2726     return pe__call_id(action) > call_id;
2727 }
2728 
2729 static void
2730 unpack_migrate_to_failure(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2731                           pe_working_set_t *data_set)
2732 {
2733     int target_stop_id = 0;
2734     int target_migrate_from_id = 0;
2735     xmlNode *target_stop = NULL;
2736     xmlNode *target_migrate_from = NULL;
2737     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2738     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2739 
2740     // Sanity check
2741     CRM_CHECK(source && target && !strcmp(source, node->details->uname), return);
2742 
2743     /* If a migration failed, we have to assume the resource is active. Clones
2744      * are not allowed to migrate, so role can't be promoted.
2745      */
2746     rsc->role = RSC_ROLE_STARTED;
2747 
2748     // Check for stop on the target
2749     target_stop = find_lrm_op(rsc->id, CRMD_ACTION_STOP, target, NULL,
2750                               TRUE, data_set);
2751     target_stop_id = pe__call_id(target_stop);
2752 
2753     // Check for migrate_from on the target
2754     target_migrate_from = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, target,
2755                                       source, TRUE, data_set);
2756     target_migrate_from_id = pe__call_id(target_migrate_from);
2757 
2758     if ((target_stop == NULL) || (target_stop_id < target_migrate_from_id)) {
2759         /* There was no stop on the target, or a stop that happened before a
2760          * migrate_from, so assume the resource is still active on the target
2761          * (if it is up).
2762          */
2763         pe_node_t *target_node = pe_find_node(data_set->nodes, target);
2764 
2765         pe_rsc_trace(rsc, "stop (%d) + migrate_from (%d)",
2766                      target_stop_id, target_migrate_from_id);
2767         if (target_node && target_node->details->online) {
2768             native_add_running(rsc, target_node, data_set, FALSE);
2769         }
2770 
2771     } else if (target_migrate_from == NULL) {
2772         /* We know there was a stop on the target, but there may not have been a
2773          * migrate_from (the stop could have happened before migrate_from was
2774          * scheduled or attempted).
2775          *
2776          * That means this could be a "dangling" migration. But first, check
2777          * whether there is a newer successful stop, start, or migrate_from on
2778          * the source node -- it's possible the failed migration was followed by
2779          * a successful stop, full restart, or migration in the reverse
2780          * direction, in which case we don't want to force a stop.
2781          */
2782         int source_migrate_to_id = pe__call_id(xml_op);
2783 
2784         if (newer_op(rsc, CRMD_ACTION_MIGRATED, source, source_migrate_to_id,
2785                      data_set)
2786             || newer_op(rsc, CRMD_ACTION_START, source, source_migrate_to_id,
2787                      data_set)
2788             || newer_op(rsc, CRMD_ACTION_STOP, source, source_migrate_to_id,
2789                      data_set)) {
2790             return;
2791         }
2792 
2793         // Mark node as having dangling migration so we can force a stop later
2794         rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
2795     }
2796 }
2797 
2798 static void
2799 unpack_migrate_from_failure(pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2800                             xmlNode *xml_op, pe_working_set_t *data_set)
2801 {
2802     xmlNode *source_stop = NULL;
2803     xmlNode *source_migrate_to = NULL;
2804     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2805     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2806 
2807     // Sanity check
2808     CRM_CHECK(source && target && !strcmp(target, node->details->uname), return);
2809 
2810     /* If a migration failed, we have to assume the resource is active. Clones
2811      * are not allowed to migrate, so role can't be promoted.
2812      */
2813     rsc->role = RSC_ROLE_STARTED;
2814 
2815     // Check for a stop on the source
2816     source_stop = find_lrm_op(rsc->id, CRMD_ACTION_STOP, source, NULL,
2817                               TRUE, data_set);
2818 
2819     // Check for a migrate_to on the source
2820     source_migrate_to = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATE,
2821                                     source, target, TRUE, data_set);
2822 
2823     if ((source_stop == NULL)
2824         || (pe__call_id(source_stop) < pe__call_id(source_migrate_to))) {
2825         /* There was no stop on the source, or a stop that happened before
2826          * migrate_to, so assume the resource is still active on the source (if
2827          * it is up).
2828          */
2829         pe_node_t *source_node = pe_find_node(data_set->nodes, source);
2830 
2831         if (source_node && source_node->details->online) {
2832             native_add_running(rsc, source_node, data_set, TRUE);
2833         }
2834     }
2835 }
2836 
2837 static void
2838 record_failed_op(xmlNode *op, const pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2839                  const pe_resource_t *rsc, pe_working_set_t *data_set)
2840 {
2841     xmlNode *xIter = NULL;
2842     const char *op_key = crm_element_value(op, XML_LRM_ATTR_TASK_KEY);
2843 
2844     if (node->details->online == FALSE) {
2845         return;
2846     }
2847 
2848     for (xIter = data_set->failed->children; xIter; xIter = xIter->next) {
2849         const char *key = crm_element_value(xIter, XML_LRM_ATTR_TASK_KEY);
2850         const char *uname = crm_element_value(xIter, XML_ATTR_UNAME);
2851 
2852         if(pcmk__str_eq(op_key, key, pcmk__str_casei) && pcmk__str_eq(uname, node->details->uname, pcmk__str_casei)) {
2853             crm_trace("Skipping duplicate entry %s on %s", op_key, node->details->uname);
2854             return;
2855         }
2856     }
2857 
2858     crm_trace("Adding entry %s on %s", op_key, node->details->uname);
2859     crm_xml_add(op, XML_ATTR_UNAME, node->details->uname);
2860     crm_xml_add(op, XML_LRM_ATTR_RSCID, rsc->id);
2861     add_node_copy(data_set->failed, op);
2862 }
2863 
2864 static const char *get_op_key(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
2865 {
2866     const char *key = crm_element_value(xml_op, XML_LRM_ATTR_TASK_KEY);
2867     if(key == NULL) {
2868         key = ID(xml_op);
2869     }
2870     return key;
2871 }
2872 
2873 static const char *
2874 last_change_str(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
2875 {
2876     time_t when;
2877     const char *when_s = NULL;
2878 
2879     if (crm_element_value_epoch(xml_op, XML_RSC_OP_LAST_CHANGE,
2880                                 &when) == pcmk_ok) {
2881         when_s = pcmk__epoch2str(&when);
2882         if (when_s) {
2883             // Skip day of week to make message shorter
2884             when_s = strchr(when_s, ' ');
2885             if (when_s) {
2886                 ++when_s;
2887             }
2888         }
2889     }
2890     return ((when_s && *when_s)? when_s : "unknown time");
2891 }
2892 
2893 /*!
2894  * \internal
2895  * \brief Compare two on-fail values
2896  *
2897  * \param[in] first   One on-fail value to compare
2898  * \param[in] second  The other on-fail value to compare
2899  *
2900  * \return A negative number if second is more severe than first, zero if they
2901  *         are equal, or a positive number if first is more severe than second.
2902  * \note This is only needed until the action_fail_response values can be
2903  *       renumbered at the next API compatibility break.
2904  */
2905 static int
2906 cmp_on_fail(enum action_fail_response first, enum action_fail_response second)
     /* [previous][next][first][last][top][bottom][index][help] */
2907 {
2908     switch (first) {
2909         case action_fail_demote:
2910             switch (second) {
2911                 case action_fail_ignore:
2912                     return 1;
2913                 case action_fail_demote:
2914                     return 0;
2915                 default:
2916                     return -1;
2917             }
2918             break;
2919 
2920         case action_fail_reset_remote:
2921             switch (second) {
2922                 case action_fail_ignore:
2923                 case action_fail_demote:
2924                 case action_fail_recover:
2925                     return 1;
2926                 case action_fail_reset_remote:
2927                     return 0;
2928                 default:
2929                     return -1;
2930             }
2931             break;
2932 
2933         case action_fail_restart_container:
2934             switch (second) {
2935                 case action_fail_ignore:
2936                 case action_fail_demote:
2937                 case action_fail_recover:
2938                 case action_fail_reset_remote:
2939                     return 1;
2940                 case action_fail_restart_container:
2941                     return 0;
2942                 default:
2943                     return -1;
2944             }
2945             break;
2946 
2947         default:
2948             break;
2949     }
2950     switch (second) {
2951         case action_fail_demote:
2952             return (first == action_fail_ignore)? -1 : 1;
2953 
2954         case action_fail_reset_remote:
2955             switch (first) {
2956                 case action_fail_ignore:
2957                 case action_fail_demote:
2958                 case action_fail_recover:
2959                     return -1;
2960                 default:
2961                     return 1;
2962             }
2963             break;
2964 
2965         case action_fail_restart_container:
2966             switch (first) {
2967                 case action_fail_ignore:
2968                 case action_fail_demote:
2969                 case action_fail_recover:
2970                 case action_fail_reset_remote:
2971                     return -1;
2972                 default:
2973                     return 1;
2974             }
2975             break;
2976 
2977         default:
2978             break;
2979     }
2980     return first - second;
2981 }
2982 
2983 static void
2984 unpack_rsc_op_failure(pe_resource_t * rsc, pe_node_t * node, int rc, xmlNode * xml_op, xmlNode ** last_failure,
     /* [previous][next][first][last][top][bottom][index][help] */
2985                       enum action_fail_response * on_fail, pe_working_set_t * data_set)
2986 {
2987     guint interval_ms = 0;
2988     bool is_probe = false;
2989     pe_action_t *action = NULL;
2990 
2991     const char *key = get_op_key(xml_op);
2992     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
2993     const char *exit_reason = crm_element_value(xml_op,
2994                                                 XML_LRM_ATTR_EXIT_REASON);
2995 
2996     CRM_ASSERT(rsc);
2997     CRM_CHECK(task != NULL, return);
2998 
2999     *last_failure = xml_op;
3000 
3001     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3002     if ((interval_ms == 0) && !strcmp(task, CRMD_ACTION_STATUS)) {
3003         is_probe = true;
3004     }
3005 
3006     if (exit_reason == NULL) {
3007         exit_reason = "";
3008     }
3009 
3010     if (!pcmk_is_set(data_set->flags, pe_flag_symmetric_cluster)
3011         && (rc == PCMK_OCF_NOT_INSTALLED)) {
3012         crm_trace("Unexpected result (%s%s%s) was recorded for "
3013                   "%s of %s on %s at %s " CRM_XS " rc=%d id=%s",
3014                   services_ocf_exitcode_str(rc),
3015                   (*exit_reason? ": " : ""), exit_reason,
3016                   (is_probe? "probe" : task), rsc->id, node->details->uname,
3017                   last_change_str(xml_op), rc, ID(xml_op));
3018     } else {
3019         crm_warn("Unexpected result (%s%s%s) was recorded for "
3020                   "%s of %s on %s at %s " CRM_XS " rc=%d id=%s",
3021                  services_ocf_exitcode_str(rc),
3022                  (*exit_reason? ": " : ""), exit_reason,
3023                  (is_probe? "probe" : task), rsc->id, node->details->uname,
3024                  last_change_str(xml_op), rc, ID(xml_op));
3025 
3026         if (is_probe && (rc != PCMK_OCF_OK)
3027             && (rc != PCMK_OCF_NOT_RUNNING)
3028             && (rc != PCMK_OCF_RUNNING_PROMOTED)) {
3029 
3030             /* A failed (not just unexpected) probe result could mean the user
3031              * didn't know resources will be probed even where they can't run.
3032              */
3033             crm_notice("If it is not possible for %s to run on %s, see "
3034                        "the resource-discovery option for location constraints",
3035                        rsc->id, node->details->uname);
3036         }
3037 
3038         record_failed_op(xml_op, node, rsc, data_set);
3039     }
3040 
3041     action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
3042     if (cmp_on_fail(*on_fail, action->on_fail) < 0) {
3043         pe_rsc_trace(rsc, "on-fail %s -> %s for %s (%s)", fail2text(*on_fail),
3044                      fail2text(action->on_fail), action->uuid, key);
3045         *on_fail = action->on_fail;
3046     }
3047 
3048     if (!strcmp(task, CRMD_ACTION_STOP)) {
3049         resource_location(rsc, node, -INFINITY, "__stop_fail__", data_set);
3050 
3051     } else if (!strcmp(task, CRMD_ACTION_MIGRATE)) {
3052         unpack_migrate_to_failure(rsc, node, xml_op, data_set);
3053 
3054     } else if (!strcmp(task, CRMD_ACTION_MIGRATED)) {
3055         unpack_migrate_from_failure(rsc, node, xml_op, data_set);
3056 
3057     } else if (!strcmp(task, CRMD_ACTION_PROMOTE)) {
3058         rsc->role = RSC_ROLE_PROMOTED;
3059 
3060     } else if (!strcmp(task, CRMD_ACTION_DEMOTE)) {
3061         if (action->on_fail == action_fail_block) {
3062             rsc->role = RSC_ROLE_PROMOTED;
3063             pe__set_next_role(rsc, RSC_ROLE_STOPPED,
3064                               "demote with on-fail=block");
3065 
3066         } else if(rc == PCMK_OCF_NOT_RUNNING) {
3067             rsc->role = RSC_ROLE_STOPPED;
3068 
3069         } else {
3070             /* Staying in the promoted role would put the scheduler and
3071              * controller into a loop. Setting the role to unpromoted is not
3072              * dangerous because the resource will be stopped as part of
3073              * recovery, and any promotion will be ordered after that stop.
3074              */
3075             rsc->role = RSC_ROLE_UNPROMOTED;
3076         }
3077     }
3078 
3079     if(is_probe && rc == PCMK_OCF_NOT_INSTALLED) {
3080         /* leave stopped */
3081         pe_rsc_trace(rsc, "Leaving %s stopped", rsc->id);
3082         rsc->role = RSC_ROLE_STOPPED;
3083 
3084     } else if (rsc->role < RSC_ROLE_STARTED) {
3085         pe_rsc_trace(rsc, "Setting %s active", rsc->id);
3086         set_active(rsc);
3087     }
3088 
3089     pe_rsc_trace(rsc, "Resource %s: role=%s, unclean=%s, on_fail=%s, fail_role=%s",
3090                  rsc->id, role2text(rsc->role),
3091                  pcmk__btoa(node->details->unclean),
3092                  fail2text(action->on_fail), role2text(action->fail_role));
3093 
3094     if (action->fail_role != RSC_ROLE_STARTED && rsc->next_role < action->fail_role) {
3095         pe__set_next_role(rsc, action->fail_role, "failure");
3096     }
3097 
3098     if (action->fail_role == RSC_ROLE_STOPPED) {
3099         int score = -INFINITY;
3100 
3101         pe_resource_t *fail_rsc = rsc;
3102 
3103         if (fail_rsc->parent) {
3104             pe_resource_t *parent = uber_parent(fail_rsc);
3105 
3106             if (pe_rsc_is_clone(parent)
3107                 && !pcmk_is_set(parent->flags, pe_rsc_unique)) {
3108                 /* For clone resources, if a child fails on an operation
3109                  * with on-fail = stop, all the resources fail.  Do this by preventing
3110                  * the parent from coming up again. */
3111                 fail_rsc = parent;
3112             }
3113         }
3114         crm_notice("%s will not be started under current conditions",
3115                    fail_rsc->id);
3116         /* make sure it doesn't come up again */
3117         if (fail_rsc->allowed_nodes != NULL) {
3118             g_hash_table_destroy(fail_rsc->allowed_nodes);
3119         }
3120         fail_rsc->allowed_nodes = pe__node_list2table(data_set->nodes);
3121         g_hash_table_foreach(fail_rsc->allowed_nodes, set_node_score, &score);
3122     }
3123 
3124     pe_free_action(action);
3125 }
3126 
3127 /*!
3128  * \internal
3129  * \brief Remap operation status based on action result
3130  *
3131  * Given an action result, determine an appropriate operation status for the
3132  * purposes of responding to the action (the status provided by the executor is
3133  * not directly usable since the executor does not know what was expected).
3134  *
3135  * \param[in,out] rsc        Resource that operation history entry is for
3136  * \param[in]     rc         Actual return code of operation
3137  * \param[in]     target_rc  Expected return code of operation
3138  * \param[in]     node       Node where operation was executed
3139  * \param[in]     xml_op     Operation history entry XML from CIB status
3140  * \param[in,out] on_fail    What should be done about the result
3141  * \param[in]     data_set   Current cluster working set
3142  *
3143  * \return Operation status based on return code and action info
3144  * \note This may update the resource's current and next role.
3145  */
3146 static int
3147 determine_op_status(
     /* [previous][next][first][last][top][bottom][index][help] */
3148     pe_resource_t *rsc, int rc, int target_rc, pe_node_t * node, xmlNode * xml_op, enum action_fail_response * on_fail, pe_working_set_t * data_set) 
3149 {
3150     guint interval_ms = 0;
3151     bool is_probe = false;
3152     int result = PCMK_EXEC_DONE;
3153     const char *key = get_op_key(xml_op);
3154     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3155     const char *exit_reason = crm_element_value(xml_op,
3156                                                 XML_LRM_ATTR_EXIT_REASON);
3157 
3158     CRM_ASSERT(rsc);
3159     CRM_CHECK(task != NULL, return PCMK_EXEC_ERROR);
3160 
3161     if (exit_reason == NULL) {
3162         exit_reason = "";
3163     }
3164 
3165     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3166     if ((interval_ms == 0) && !strcmp(task, CRMD_ACTION_STATUS)) {
3167         is_probe = true;
3168         task = "probe";
3169     }
3170 
3171     if (target_rc < 0) {
3172         /* Pre-1.0 Pacemaker versions, and Pacemaker 1.1.6 or earlier with
3173          * Heartbeat 2.0.7 or earlier as the cluster layer, did not include the
3174          * target_rc in the transition key, which (along with the similar case
3175          * of a corrupted transition key in the CIB) will be reported to this
3176          * function as -1. Pacemaker 2.0+ does not support rolling upgrades from
3177          * those versions or processing of saved CIB files from those versions,
3178          * so we do not need to care much about this case.
3179          */
3180         result = PCMK_EXEC_ERROR;
3181         crm_warn("Expected result not found for %s on %s (corrupt or obsolete CIB?)",
3182                  key, node->details->uname);
3183 
3184     } else if (target_rc != rc) {
3185         result = PCMK_EXEC_ERROR;
3186         pe_rsc_debug(rsc, "%s on %s: expected %d (%s), got %d (%s%s%s)",
3187                      key, node->details->uname,
3188                      target_rc, services_ocf_exitcode_str(target_rc),
3189                      rc, services_ocf_exitcode_str(rc),
3190                      (*exit_reason? ": " : ""), exit_reason);
3191     }
3192 
3193     switch (rc) {
3194         case PCMK_OCF_OK:
3195             if (is_probe && (target_rc == PCMK_OCF_NOT_RUNNING)) {
3196                 result = PCMK_EXEC_DONE;
3197                 pe_rsc_info(rsc, "Probe found %s active on %s at %s",
3198                             rsc->id, node->details->uname,
3199                             last_change_str(xml_op));
3200             }
3201             break;
3202 
3203         case PCMK_OCF_NOT_RUNNING:
3204             if (is_probe || (target_rc == rc)
3205                 || !pcmk_is_set(rsc->flags, pe_rsc_managed)) {
3206 
3207                 result = PCMK_EXEC_DONE;
3208                 rsc->role = RSC_ROLE_STOPPED;
3209 
3210                 /* clear any previous failure actions */
3211                 *on_fail = action_fail_ignore;
3212                 pe__set_next_role(rsc, RSC_ROLE_UNKNOWN, "not running");
3213             }
3214             break;
3215 
3216         case PCMK_OCF_RUNNING_PROMOTED:
3217             if (is_probe && (rc != target_rc)) {
3218                 result = PCMK_EXEC_DONE;
3219                 pe_rsc_info(rsc,
3220                             "Probe found %s active and promoted on %s at %s",
3221                             rsc->id, node->details->uname,
3222                             last_change_str(xml_op));
3223             }
3224             rsc->role = RSC_ROLE_PROMOTED;
3225             break;
3226 
3227         case PCMK_OCF_DEGRADED_PROMOTED:
3228         case PCMK_OCF_FAILED_PROMOTED:
3229             rsc->role = RSC_ROLE_PROMOTED;
3230             result = PCMK_EXEC_ERROR;
3231             break;
3232 
3233         case PCMK_OCF_NOT_CONFIGURED:
3234             result = PCMK_EXEC_ERROR_FATAL;
3235             break;
3236 
3237         case PCMK_OCF_UNIMPLEMENT_FEATURE:
3238             if (interval_ms > 0) {
3239                 result = PCMK_EXEC_NOT_SUPPORTED;
3240                 break;
3241             }
3242             // fall through
3243         case PCMK_OCF_NOT_INSTALLED:
3244         case PCMK_OCF_INVALID_PARAM:
3245         case PCMK_OCF_INSUFFICIENT_PRIV:
3246             if (!pe_can_fence(data_set, node)
3247                 && !strcmp(task, CRMD_ACTION_STOP)) {
3248                 /* If a stop fails and we can't fence, there's nothing else we can do */
3249                 pe_proc_err("No further recovery can be attempted for %s "
3250                             "because %s on %s failed (%s%s%s) at %s "
3251                             CRM_XS " rc=%d id=%s", rsc->id, task,
3252                             node->details->uname, services_ocf_exitcode_str(rc),
3253                             (*exit_reason? ": " : ""), exit_reason,
3254                             last_change_str(xml_op), rc, ID(xml_op));
3255                 pe__clear_resource_flags(rsc, pe_rsc_managed);
3256                 pe__set_resource_flags(rsc, pe_rsc_block);
3257             }
3258             result = PCMK_EXEC_ERROR_HARD;
3259             break;
3260 
3261         default:
3262             if (result == PCMK_EXEC_DONE) {
3263                 crm_info("Treating unknown exit status %d from %s of %s "
3264                          "on %s at %s as failure",
3265                          rc, task, rsc->id, node->details->uname,
3266                          last_change_str(xml_op));
3267                 result = PCMK_EXEC_ERROR;
3268             }
3269             break;
3270     }
3271     return result;
3272 }
3273 
3274 // return TRUE if start or monitor last failure but parameters changed
3275 static bool
3276 should_clear_for_param_change(xmlNode *xml_op, const char *task,
     /* [previous][next][first][last][top][bottom][index][help] */
3277                               pe_resource_t *rsc, pe_node_t *node,
3278                               pe_working_set_t *data_set)
3279 {
3280     if (!strcmp(task, "start") || !strcmp(task, "monitor")) {
3281 
3282         if (pe__bundle_needs_remote_name(rsc, data_set)) {
3283             /* We haven't allocated resources yet, so we can't reliably
3284              * substitute addr parameters for the REMOTE_CONTAINER_HACK.
3285              * When that's needed, defer the check until later.
3286              */
3287             pe__add_param_check(xml_op, rsc, node, pe_check_last_failure,
3288                                 data_set);
3289 
3290         } else {
3291             op_digest_cache_t *digest_data = NULL;
3292 
3293             digest_data = rsc_action_digest_cmp(rsc, xml_op, node, data_set);
3294             switch (digest_data->rc) {
3295                 case RSC_DIGEST_UNKNOWN:
3296                     crm_trace("Resource %s history entry %s on %s"
3297                               " has no digest to compare",
3298                               rsc->id, get_op_key(xml_op), node->details->id);
3299                     break;
3300                 case RSC_DIGEST_MATCH:
3301                     break;
3302                 default:
3303                     return TRUE;
3304             }
3305         }
3306     }
3307     return FALSE;
3308 }
3309 
3310 // Order action after fencing of remote node, given connection rsc
3311 static void
3312 order_after_remote_fencing(pe_action_t *action, pe_resource_t *remote_conn,
     /* [previous][next][first][last][top][bottom][index][help] */
3313                            pe_working_set_t *data_set)
3314 {
3315     pe_node_t *remote_node = pe_find_node(data_set->nodes, remote_conn->id);
3316 
3317     if (remote_node) {
3318         pe_action_t *fence = pe_fence_op(remote_node, NULL, TRUE, NULL,
3319                                          FALSE, data_set);
3320 
3321         order_actions(fence, action, pe_order_implies_then);
3322     }
3323 }
3324 
3325 static bool
3326 should_ignore_failure_timeout(pe_resource_t *rsc, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
3327                               const char *task, guint interval_ms,
3328                               bool is_last_failure, pe_working_set_t *data_set)
3329 {
3330     /* Clearing failures of recurring monitors has special concerns. The
3331      * executor reports only changes in the monitor result, so if the
3332      * monitor is still active and still getting the same failure result,
3333      * that will go undetected after the failure is cleared.
3334      *
3335      * Also, the operation history will have the time when the recurring
3336      * monitor result changed to the given code, not the time when the
3337      * result last happened.
3338      *
3339      * @TODO We probably should clear such failures only when the failure
3340      * timeout has passed since the last occurrence of the failed result.
3341      * However we don't record that information. We could maybe approximate
3342      * that by clearing only if there is a more recent successful monitor or
3343      * stop result, but we don't even have that information at this point
3344      * since we are still unpacking the resource's operation history.
3345      *
3346      * This is especially important for remote connection resources with a
3347      * reconnect interval, so in that case, we skip clearing failures
3348      * if the remote node hasn't been fenced.
3349      */
3350     if (rsc->remote_reconnect_ms
3351         && pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
3352         && (interval_ms != 0) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3353 
3354         pe_node_t *remote_node = pe_find_node(data_set->nodes, rsc->id);
3355 
3356         if (remote_node && !remote_node->details->remote_was_fenced) {
3357             if (is_last_failure) {
3358                 crm_info("Waiting to clear monitor failure for remote node %s"
3359                          " until fencing has occurred", rsc->id);
3360             }
3361             return TRUE;
3362         }
3363     }
3364     return FALSE;
3365 }
3366 
3367 /*!
3368  * \internal
3369  * \brief Check operation age and schedule failure clearing when appropriate
3370  *
3371  * This function has two distinct purposes. The first is to check whether an
3372  * operation history entry is expired (i.e. the resource has a failure timeout,
3373  * the entry is older than the timeout, and the resource either has no fail
3374  * count or its fail count is entirely older than the timeout). The second is to
3375  * schedule fail count clearing when appropriate (i.e. the operation is expired
3376  * and either the resource has an expired fail count or the operation is a
3377  * last_failure for a remote connection resource with a reconnect interval,
3378  * or the operation is a last_failure for a start or monitor operation and the
3379  * resource's parameters have changed since the operation).
3380  *
3381  * \param[in] rsc       Resource that operation happened to
3382  * \param[in] node      Node that operation happened on
3383  * \param[in] rc        Actual result of operation
3384  * \param[in] xml_op    Operation history entry XML
3385  * \param[in] data_set  Current working set
3386  *
3387  * \return TRUE if operation history entry is expired, FALSE otherwise
3388  */
3389 static bool
3390 check_operation_expiry(pe_resource_t *rsc, pe_node_t *node, int rc,
     /* [previous][next][first][last][top][bottom][index][help] */
3391                        xmlNode *xml_op, pe_working_set_t *data_set)
3392 {
3393     bool expired = FALSE;
3394     bool is_last_failure = pcmk__ends_with(ID(xml_op), "_last_failure_0");
3395     time_t last_run = 0;
3396     guint interval_ms = 0;
3397     int unexpired_fail_count = 0;
3398     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3399     const char *clear_reason = NULL;
3400 
3401     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3402 
3403     if ((rsc->failure_timeout > 0)
3404         && (crm_element_value_epoch(xml_op, XML_RSC_OP_LAST_CHANGE,
3405                                     &last_run) == 0)) {
3406 
3407         // Resource has a failure-timeout, and history entry has a timestamp
3408 
3409         time_t now = get_effective_time(data_set);
3410         time_t last_failure = 0;
3411 
3412         // Is this particular operation history older than the failure timeout?
3413         if ((now >= (last_run + rsc->failure_timeout))
3414             && !should_ignore_failure_timeout(rsc, xml_op, task, interval_ms,
3415                                               is_last_failure, data_set)) {
3416             expired = TRUE;
3417         }
3418 
3419         // Does the resource as a whole have an unexpired fail count?
3420         unexpired_fail_count = pe_get_failcount(node, rsc, &last_failure,
3421                                                 pe_fc_effective, xml_op,
3422                                                 data_set);
3423 
3424         // Update scheduler recheck time according to *last* failure
3425         crm_trace("%s@%lld is %sexpired @%lld with unexpired_failures=%d timeout=%ds"
3426                   " last-failure@%lld",
3427                   ID(xml_op), (long long) last_run, (expired? "" : "not "),
3428                   (long long) now, unexpired_fail_count, rsc->failure_timeout,
3429                   (long long) last_failure);
3430         last_failure += rsc->failure_timeout + 1;
3431         if (unexpired_fail_count && (now < last_failure)) {
3432             pe__update_recheck_time(last_failure, data_set);
3433         }
3434     }
3435 
3436     if (expired) {
3437         if (pe_get_failcount(node, rsc, NULL, pe_fc_default, xml_op, data_set)) {
3438 
3439             // There is a fail count ignoring timeout
3440 
3441             if (unexpired_fail_count == 0) {
3442                 // There is no fail count considering timeout
3443                 clear_reason = "it expired";
3444 
3445             } else {
3446                 /* This operation is old, but there is an unexpired fail count.
3447                  * In a properly functioning cluster, this should only be
3448                  * possible if this operation is not a failure (otherwise the
3449                  * fail count should be expired too), so this is really just a
3450                  * failsafe.
3451                  */
3452                 expired = FALSE;
3453             }
3454 
3455         } else if (is_last_failure && rsc->remote_reconnect_ms) {
3456             /* Clear any expired last failure when reconnect interval is set,
3457              * even if there is no fail count.
3458              */
3459             clear_reason = "reconnect interval is set";
3460         }
3461     }
3462 
3463     if (!expired && is_last_failure
3464         && should_clear_for_param_change(xml_op, task, rsc, node, data_set)) {
3465         clear_reason = "resource parameters have changed";
3466     }
3467 
3468     if (clear_reason != NULL) {
3469         // Schedule clearing of the fail count
3470         pe_action_t *clear_op = pe__clear_failcount(rsc, node, clear_reason,
3471                                                     data_set);
3472 
3473         if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
3474             && rsc->remote_reconnect_ms) {
3475             /* If we're clearing a remote connection due to a reconnect
3476              * interval, we want to wait until any scheduled fencing
3477              * completes.
3478              *
3479              * We could limit this to remote_node->details->unclean, but at
3480              * this point, that's always true (it won't be reliable until
3481              * after unpack_node_history() is done).
3482              */
3483             crm_info("Clearing %s failure will wait until any scheduled "
3484                      "fencing of %s completes", task, rsc->id);
3485             order_after_remote_fencing(clear_op, rsc, data_set);
3486         }
3487     }
3488 
3489     if (expired && (interval_ms == 0) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3490         switch(rc) {
3491             case PCMK_OCF_OK:
3492             case PCMK_OCF_NOT_RUNNING:
3493             case PCMK_OCF_RUNNING_PROMOTED:
3494             case PCMK_OCF_DEGRADED:
3495             case PCMK_OCF_DEGRADED_PROMOTED:
3496                 // Don't expire probes that return these values
3497                 expired = FALSE;
3498                 break;
3499         }
3500     }
3501 
3502     return expired;
3503 }
3504 
3505 int pe__target_rc_from_xml(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
3506 {
3507     int target_rc = 0;
3508     const char *key = crm_element_value(xml_op, XML_ATTR_TRANSITION_KEY);
3509 
3510     if (key == NULL) {
3511         return -1;
3512     }
3513     decode_transition_key(key, NULL, NULL, NULL, &target_rc);
3514     return target_rc;
3515 }
3516 
3517 static enum action_fail_response
3518 get_action_on_fail(pe_resource_t *rsc, const char *key, const char *task, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
3519 {
3520     enum action_fail_response result = action_fail_recover;
3521     pe_action_t *action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
3522 
3523     result = action->on_fail;
3524     pe_free_action(action);
3525 
3526     return result;
3527 }
3528 
3529 static void
3530 update_resource_state(pe_resource_t * rsc, pe_node_t * node, xmlNode * xml_op, const char * task, int rc,
     /* [previous][next][first][last][top][bottom][index][help] */
3531                       xmlNode * last_failure, enum action_fail_response * on_fail, pe_working_set_t * data_set)
3532 {
3533     gboolean clear_past_failure = FALSE;
3534 
3535     CRM_ASSERT(rsc);
3536     CRM_ASSERT(xml_op);
3537 
3538     if (rc == PCMK_OCF_NOT_RUNNING) {
3539         clear_past_failure = TRUE;
3540 
3541     } else if (rc == PCMK_OCF_NOT_INSTALLED) {
3542         rsc->role = RSC_ROLE_STOPPED;
3543 
3544     } else if (pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3545         if (last_failure) {
3546             const char *op_key = get_op_key(xml_op);
3547             const char *last_failure_key = get_op_key(last_failure);
3548 
3549             if (pcmk__str_eq(op_key, last_failure_key, pcmk__str_casei)) {
3550                 clear_past_failure = TRUE;
3551             }
3552         }
3553 
3554         if (rsc->role < RSC_ROLE_STARTED) {
3555             set_active(rsc);
3556         }
3557 
3558     } else if (pcmk__str_eq(task, CRMD_ACTION_START, pcmk__str_casei)) {
3559         rsc->role = RSC_ROLE_STARTED;
3560         clear_past_failure = TRUE;
3561 
3562     } else if (pcmk__str_eq(task, CRMD_ACTION_STOP, pcmk__str_casei)) {
3563         rsc->role = RSC_ROLE_STOPPED;
3564         clear_past_failure = TRUE;
3565 
3566     } else if (pcmk__str_eq(task, CRMD_ACTION_PROMOTE, pcmk__str_casei)) {
3567         rsc->role = RSC_ROLE_PROMOTED;
3568         clear_past_failure = TRUE;
3569 
3570     } else if (pcmk__str_eq(task, CRMD_ACTION_DEMOTE, pcmk__str_casei)) {
3571 
3572         if (*on_fail == action_fail_demote) {
3573             // Demote clears an error only if on-fail=demote
3574             clear_past_failure = TRUE;
3575         }
3576         rsc->role = RSC_ROLE_UNPROMOTED;
3577 
3578     } else if (pcmk__str_eq(task, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
3579         rsc->role = RSC_ROLE_STARTED;
3580         clear_past_failure = TRUE;
3581 
3582     } else if (pcmk__str_eq(task, CRMD_ACTION_MIGRATE, pcmk__str_casei)) {
3583         unpack_migrate_to_success(rsc, node, xml_op, data_set);
3584 
3585     } else if (rsc->role < RSC_ROLE_STARTED) {
3586         pe_rsc_trace(rsc, "%s active on %s", rsc->id, node->details->uname);
3587         set_active(rsc);
3588     }
3589 
3590     /* clear any previous failure actions */
3591     if (clear_past_failure) {
3592         switch (*on_fail) {
3593             case action_fail_stop:
3594             case action_fail_fence:
3595             case action_fail_migrate:
3596             case action_fail_standby:
3597                 pe_rsc_trace(rsc, "%s.%s is not cleared by a completed stop",
3598                              rsc->id, fail2text(*on_fail));
3599                 break;
3600 
3601             case action_fail_block:
3602             case action_fail_ignore:
3603             case action_fail_demote:
3604             case action_fail_recover:
3605             case action_fail_restart_container:
3606                 *on_fail = action_fail_ignore;
3607                 pe__set_next_role(rsc, RSC_ROLE_UNKNOWN, "clear past failures");
3608                 break;
3609             case action_fail_reset_remote:
3610                 if (rsc->remote_reconnect_ms == 0) {
3611                     /* With no reconnect interval, the connection is allowed to
3612                      * start again after the remote node is fenced and
3613                      * completely stopped. (With a reconnect interval, we wait
3614                      * for the failure to be cleared entirely before attempting
3615                      * to reconnect.)
3616                      */
3617                     *on_fail = action_fail_ignore;
3618                     pe__set_next_role(rsc, RSC_ROLE_UNKNOWN,
3619                                       "clear past failures and reset remote");
3620                 }
3621                 break;
3622         }
3623     }
3624 }
3625 
3626 /*!
3627  * \internal
3628  * \brief Remap informational monitor results to usual values
3629  *
3630  * Certain OCF result codes are for providing extended information to the
3631  * user about services that aren't yet failed but not entirely healthy either.
3632  * These must be treated as the "normal" result by Pacemaker.
3633  *
3634  * \param[in] rc        Actual result of a monitor action
3635  * \param[in] xml_op    Operation history XML
3636  * \param[in] node      Node that operation happened on
3637  * \param[in] rsc       Resource that operation happened to
3638  * \param[in] data_set  Cluster working set
3639  *
3640  * \return Result code that pacemaker should use
3641  *
3642  * \note If the result is remapped, and the node is not shutting down or failed,
3643  *       the operation will be recorded in the data set's list of failed
3644  *       operations, to highlight it for the user.
3645  */
3646 static int
3647 remap_monitor_rc(int rc, xmlNode *xml_op, const pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
3648                  const pe_resource_t *rsc, pe_working_set_t *data_set)
3649 {
3650     int remapped_rc = pcmk__effective_rc(rc);
3651 
3652     if (rc != remapped_rc) {
3653         crm_trace("Remapping monitor result %d to %d", rc, remapped_rc);
3654         if (!node->details->shutdown || node->details->online) {
3655             record_failed_op(xml_op, node, rsc, data_set);
3656         }
3657     }
3658     return remapped_rc;
3659 }
3660 
3661 static void
3662 unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
3663               xmlNode **last_failure, enum action_fail_response *on_fail,
3664               pe_working_set_t *data_set)
3665 {
3666     int rc = 0;
3667     int task_id = 0;
3668     int target_rc = 0;
3669     int status = PCMK_EXEC_UNKNOWN;
3670     guint interval_ms = 0;
3671     const char *task = NULL;
3672     const char *task_key = NULL;
3673     const char *exit_reason = NULL;
3674     bool expired = FALSE;
3675     pe_resource_t *parent = rsc;
3676     enum action_fail_response failure_strategy = action_fail_recover;
3677 
3678     CRM_CHECK(rsc && node && xml_op, return);
3679 
3680     target_rc = pe__target_rc_from_xml(xml_op);
3681     task_key = get_op_key(xml_op);
3682     task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3683     exit_reason = crm_element_value(xml_op, XML_LRM_ATTR_EXIT_REASON);
3684     if (exit_reason == NULL) {
3685         exit_reason = "";
3686     }
3687 
3688     crm_element_value_int(xml_op, XML_LRM_ATTR_RC, &rc);
3689     crm_element_value_int(xml_op, XML_LRM_ATTR_CALLID, &task_id);
3690     crm_element_value_int(xml_op, XML_LRM_ATTR_OPSTATUS, &status);
3691     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3692 
3693     CRM_CHECK(task != NULL, return);
3694     CRM_CHECK((status >= PCMK_EXEC_PENDING) && (status <= PCMK_EXEC_MAX),
3695               return);
3696 
3697     if (!strcmp(task, CRMD_ACTION_NOTIFY) ||
3698         !strcmp(task, CRMD_ACTION_METADATA)) {
3699         /* safe to ignore these */
3700         return;
3701     }
3702 
3703     if (!pcmk_is_set(rsc->flags, pe_rsc_unique)) {
3704         parent = uber_parent(rsc);
3705     }
3706 
3707     pe_rsc_trace(rsc, "Unpacking task %s/%s (call_id=%d, status=%d, rc=%d) on %s (role=%s)",
3708                  task_key, task, task_id, status, rc, node->details->uname, role2text(rsc->role));
3709 
3710     if (node->details->unclean) {
3711         pe_rsc_trace(rsc, "Node %s (where %s is running) is unclean."
3712                      " Further action depends on the value of the stop's on-fail attribute",
3713                      node->details->uname, rsc->id);
3714     }
3715 
3716     /* It should be possible to call remap_monitor_rc() first then call
3717      * check_operation_expiry() only if rc != target_rc, because there should
3718      * never be a fail count without at least one unexpected result in the
3719      * resource history. That would be more efficient by avoiding having to call
3720      * check_operation_expiry() for expected results.
3721      *
3722      * However, we do have such configurations in the scheduler regression
3723      * tests, even if it shouldn't be possible with the current code. It's
3724      * probably a good idea anyway, but that would require updating the test
3725      * inputs to something currently possible.
3726      */
3727 
3728     if ((status != PCMK_EXEC_NOT_INSTALLED)
3729         && check_operation_expiry(rsc, node, rc, xml_op, data_set)) {
3730         expired = TRUE;
3731     }
3732 
3733     if (!strcmp(task, CRMD_ACTION_STATUS)) {
3734         rc = remap_monitor_rc(rc, xml_op, node, rsc, data_set);
3735     }
3736 
3737     if (expired && (rc != target_rc)) {
3738         const char *magic = crm_element_value(xml_op, XML_ATTR_TRANSITION_MAGIC);
3739 
3740         if (interval_ms == 0) {
3741             crm_notice("Ignoring expired %s failure on %s "
3742                        CRM_XS " actual=%d expected=%d magic=%s",
3743                        task_key, node->details->uname, rc, target_rc, magic);
3744             goto done;
3745 
3746         } else if(node->details->online && node->details->unclean == FALSE) {
3747             /* Reschedule the recurring monitor. CancelXmlOp() won't work at
3748              * this stage, so as a hacky workaround, forcibly change the restart
3749              * digest so check_action_definition() does what we want later.
3750              *
3751              * @TODO We should skip this if there is a newer successful monitor.
3752              *       Also, this causes rescheduling only if the history entry
3753              *       has an op-digest (which the expire-non-blocked-failure
3754              *       scheduler regression test doesn't, but that may not be a
3755              *       realistic scenario in production).
3756              */
3757             crm_notice("Rescheduling %s after failure expired on %s "
3758                        CRM_XS " actual=%d expected=%d magic=%s",
3759                        task_key, node->details->uname, rc, target_rc, magic);
3760             crm_xml_add(xml_op, XML_LRM_ATTR_RESTART_DIGEST, "calculated-failure-timeout");
3761             goto done;
3762         }
3763     }
3764 
3765     /* If the executor reported an operation status of anything but done or
3766      * error, consider that final. But for done or error, we know better whether
3767      * it should be treated as a failure or not, because we know the expected
3768      * result.
3769      */
3770     if(status == PCMK_EXEC_DONE || status == PCMK_EXEC_ERROR) {
3771         status = determine_op_status(rsc, rc, target_rc, node, xml_op, on_fail, data_set);
3772         pe_rsc_trace(rsc, "Remapped %s status to %d", task_key, status);
3773     }
3774 
3775     switch (status) {
3776         case PCMK_EXEC_CANCELLED:
3777             // Should never happen
3778             pe_err("Resource history contains cancellation '%s' "
3779                    "(%s of %s on %s at %s)",
3780                    ID(xml_op), task, rsc->id, node->details->uname,
3781                    last_change_str(xml_op));
3782             goto done;
3783 
3784         case PCMK_EXEC_PENDING:
3785             if (!strcmp(task, CRMD_ACTION_START)) {
3786                 pe__set_resource_flags(rsc, pe_rsc_start_pending);
3787                 set_active(rsc);
3788 
3789             } else if (!strcmp(task, CRMD_ACTION_PROMOTE)) {
3790                 rsc->role = RSC_ROLE_PROMOTED;
3791 
3792             } else if (!strcmp(task, CRMD_ACTION_MIGRATE) && node->details->unclean) {
3793                 /* If a pending migrate_to action is out on a unclean node,
3794                  * we have to force the stop action on the target. */
3795                 const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
3796                 pe_node_t *target = pe_find_node(data_set->nodes, migrate_target);
3797                 if (target) {
3798                     stop_action(rsc, target, FALSE);
3799                 }
3800             }
3801 
3802             if (rsc->pending_task == NULL) {
3803                 if ((interval_ms != 0) || strcmp(task, CRMD_ACTION_STATUS)) {
3804                     rsc->pending_task = strdup(task);
3805                     rsc->pending_node = node;
3806                 } else {
3807                     /* Pending probes are not printed, even if pending
3808                      * operations are requested. If someone ever requests that
3809                      * behavior, enable the below and the corresponding part of
3810                      * native.c:native_pending_task().
3811                      */
3812 #if 0
3813                     rsc->pending_task = strdup("probe");
3814                     rsc->pending_node = node;
3815 #endif
3816                 }
3817             }
3818             goto done;
3819 
3820         case PCMK_EXEC_DONE:
3821             pe_rsc_trace(rsc, "%s of %s on %s completed at %s " CRM_XS " id=%s",
3822                          task, rsc->id, node->details->uname,
3823                          last_change_str(xml_op), ID(xml_op));
3824             update_resource_state(rsc, node, xml_op, task, rc, *last_failure, on_fail, data_set);
3825             goto done;
3826 
3827         case PCMK_EXEC_NOT_INSTALLED:
3828             failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
3829             if (failure_strategy == action_fail_ignore) {
3830                 crm_warn("Cannot ignore failed %s of %s on %s: "
3831                          "Resource agent doesn't exist "
3832                          CRM_XS " status=%d rc=%d id=%s",
3833                          task, rsc->id, node->details->uname, status, rc,
3834                          ID(xml_op));
3835                 /* Also for printing it as "FAILED" by marking it as pe_rsc_failed later */
3836                 *on_fail = action_fail_migrate;
3837             }
3838             resource_location(parent, node, -INFINITY, "hard-error", data_set);
3839             unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail, data_set);
3840             goto done;
3841 
3842         case PCMK_EXEC_NOT_CONNECTED:
3843             if (pe__is_guest_or_remote_node(node)
3844                 && pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_managed)) {
3845                 /* We should never get into a situation where a managed remote
3846                  * connection resource is considered OK but a resource action
3847                  * behind the connection gets a "not connected" status. But as a
3848                  * fail-safe in case a bug or unusual circumstances do lead to
3849                  * that, ensure the remote connection is considered failed.
3850                  */
3851                 pe__set_resource_flags(node->details->remote_rsc,
3852                                        pe_rsc_failed|pe_rsc_stop);
3853             }
3854             break; // Not done, do error handling
3855 
3856         case PCMK_EXEC_ERROR:
3857         case PCMK_EXEC_ERROR_HARD:
3858         case PCMK_EXEC_ERROR_FATAL:
3859         case PCMK_EXEC_TIMEOUT:
3860         case PCMK_EXEC_NOT_SUPPORTED:
3861         case PCMK_EXEC_INVALID:
3862             break; // Not done, do error handling
3863 
3864         /* These should only be possible in fence action results, not operation
3865          * history, but have some handling in place as a fail-safe.
3866          */
3867         case PCMK_EXEC_NO_FENCE_DEVICE:
3868         case PCMK_EXEC_NO_SECRETS:
3869             status = PCMK_EXEC_ERROR_HARD;
3870             break; // Not done, do error handling
3871     }
3872 
3873     failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
3874     if ((failure_strategy == action_fail_ignore)
3875         || (failure_strategy == action_fail_restart_container
3876             && !strcmp(task, CRMD_ACTION_STOP))) {
3877 
3878         crm_warn("Pretending failed %s (%s%s%s) of %s on %s at %s "
3879                  "succeeded " CRM_XS " rc=%d id=%s",
3880                  task, services_ocf_exitcode_str(rc),
3881                  (*exit_reason? ": " : ""), exit_reason, rsc->id,
3882                  node->details->uname, last_change_str(xml_op), rc,
3883                  ID(xml_op));
3884 
3885         update_resource_state(rsc, node, xml_op, task, target_rc, *last_failure,
3886                               on_fail, data_set);
3887         crm_xml_add(xml_op, XML_ATTR_UNAME, node->details->uname);
3888         pe__set_resource_flags(rsc, pe_rsc_failure_ignored);
3889 
3890         record_failed_op(xml_op, node, rsc, data_set);
3891 
3892         if ((failure_strategy == action_fail_restart_container)
3893             && cmp_on_fail(*on_fail, action_fail_recover) <= 0) {
3894             *on_fail = failure_strategy;
3895         }
3896 
3897     } else {
3898         unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail,
3899                               data_set);
3900 
3901         if (status == PCMK_EXEC_ERROR_HARD) {
3902             do_crm_log(rc != PCMK_OCF_NOT_INSTALLED?LOG_ERR:LOG_NOTICE,
3903                        "Preventing %s from restarting on %s because "
3904                        "of hard failure (%s%s%s)" CRM_XS " rc=%d id=%s",
3905                        parent->id, node->details->uname,
3906                        services_ocf_exitcode_str(rc),
3907                        (*exit_reason? ": " : ""), exit_reason,
3908                        rc, ID(xml_op));
3909             resource_location(parent, node, -INFINITY, "hard-error", data_set);
3910 
3911         } else if (status == PCMK_EXEC_ERROR_FATAL) {
3912             crm_err("Preventing %s from restarting anywhere because "
3913                     "of fatal failure (%s%s%s) " CRM_XS " rc=%d id=%s",
3914                     parent->id, services_ocf_exitcode_str(rc),
3915                     (*exit_reason? ": " : ""), exit_reason,
3916                     rc, ID(xml_op));
3917             resource_location(parent, NULL, -INFINITY, "fatal-error", data_set);
3918         }
3919     }
3920 
3921 done:
3922     pe_rsc_trace(rsc, "Resource %s after %s: role=%s, next=%s",
3923                  rsc->id, task, role2text(rsc->role),
3924                  role2text(rsc->next_role));
3925 }
3926 
3927 static void
3928 add_node_attrs(xmlNode *xml_obj, pe_node_t *node, bool overwrite,
     /* [previous][next][first][last][top][bottom][index][help] */
3929                pe_working_set_t *data_set)
3930 {
3931     const char *cluster_name = NULL;
3932 
3933     pe_rule_eval_data_t rule_data = {
3934         .node_hash = NULL,
3935         .role = RSC_ROLE_UNKNOWN,
3936         .now = data_set->now,
3937         .match_data = NULL,
3938         .rsc_data = NULL,
3939         .op_data = NULL
3940     };
3941 
3942     g_hash_table_insert(node->details->attrs,
3943                         strdup(CRM_ATTR_UNAME), strdup(node->details->uname));
3944 
3945     g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_ID),
3946                         strdup(node->details->id));
3947     if (pcmk__str_eq(node->details->id, data_set->dc_uuid, pcmk__str_casei)) {
3948         data_set->dc_node = node;
3949         node->details->is_dc = TRUE;
3950         g_hash_table_insert(node->details->attrs,
3951                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_TRUE));
3952     } else {
3953         g_hash_table_insert(node->details->attrs,
3954                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_FALSE));
3955     }
3956 
3957     cluster_name = g_hash_table_lookup(data_set->config_hash, "cluster-name");
3958     if (cluster_name) {
3959         g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_CLUSTER_NAME),
3960                             strdup(cluster_name));
3961     }
3962 
3963     pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_ATTR_SETS, &rule_data,
3964                                node->details->attrs, NULL, overwrite, data_set);
3965 
3966     if (pe_node_attribute_raw(node, CRM_ATTR_SITE_NAME) == NULL) {
3967         const char *site_name = pe_node_attribute_raw(node, "site-name");
3968 
3969         if (site_name) {
3970             g_hash_table_insert(node->details->attrs,
3971                                 strdup(CRM_ATTR_SITE_NAME),
3972                                 strdup(site_name));
3973 
3974         } else if (cluster_name) {
3975             /* Default to cluster-name if unset */
3976             g_hash_table_insert(node->details->attrs,
3977                                 strdup(CRM_ATTR_SITE_NAME),
3978                                 strdup(cluster_name));
3979         }
3980     }
3981 }
3982 
3983 static GList *
3984 extract_operations(const char *node, const char *rsc, xmlNode * rsc_entry, gboolean active_filter)
     /* [previous][next][first][last][top][bottom][index][help] */
3985 {
3986     int counter = -1;
3987     int stop_index = -1;
3988     int start_index = -1;
3989 
3990     xmlNode *rsc_op = NULL;
3991 
3992     GList *gIter = NULL;
3993     GList *op_list = NULL;
3994     GList *sorted_op_list = NULL;
3995 
3996     /* extract operations */
3997     op_list = NULL;
3998     sorted_op_list = NULL;
3999 
4000     for (rsc_op = pcmk__xe_first_child(rsc_entry);
4001          rsc_op != NULL; rsc_op = pcmk__xe_next(rsc_op)) {
4002 
4003         if (pcmk__str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP,
4004                          pcmk__str_none)) {
4005             crm_xml_add(rsc_op, "resource", rsc);
4006             crm_xml_add(rsc_op, XML_ATTR_UNAME, node);
4007             op_list = g_list_prepend(op_list, rsc_op);
4008         }
4009     }
4010 
4011     if (op_list == NULL) {
4012         /* if there are no operations, there is nothing to do */
4013         return NULL;
4014     }
4015 
4016     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
4017 
4018     /* create active recurring operations as optional */
4019     if (active_filter == FALSE) {
4020         return sorted_op_list;
4021     }
4022 
4023     op_list = NULL;
4024 
4025     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
4026 
4027     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
4028         xmlNode *rsc_op = (xmlNode *) gIter->data;
4029 
4030         counter++;
4031 
4032         if (start_index < stop_index) {
4033             crm_trace("Skipping %s: not active", ID(rsc_entry));
4034             break;
4035 
4036         } else if (counter < start_index) {
4037             crm_trace("Skipping %s: old", ID(rsc_op));
4038             continue;
4039         }
4040         op_list = g_list_append(op_list, rsc_op);
4041     }
4042 
4043     g_list_free(sorted_op_list);
4044     return op_list;
4045 }
4046 
4047 GList *
4048 find_operations(const char *rsc, const char *node, gboolean active_filter,
     /* [previous][next][first][last][top][bottom][index][help] */
4049                 pe_working_set_t * data_set)
4050 {
4051     GList *output = NULL;
4052     GList *intermediate = NULL;
4053 
4054     xmlNode *tmp = NULL;
4055     xmlNode *status = find_xml_node(data_set->input, XML_CIB_TAG_STATUS, TRUE);
4056 
4057     pe_node_t *this_node = NULL;
4058 
4059     xmlNode *node_state = NULL;
4060 
4061     for (node_state = pcmk__xe_first_child(status); node_state != NULL;
4062          node_state = pcmk__xe_next(node_state)) {
4063 
4064         if (pcmk__str_eq((const char *)node_state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
4065             const char *uname = crm_element_value(node_state, XML_ATTR_UNAME);
4066 
4067             if (node != NULL && !pcmk__str_eq(uname, node, pcmk__str_casei)) {
4068                 continue;
4069             }
4070 
4071             this_node = pe_find_node(data_set->nodes, uname);
4072             if(this_node == NULL) {
4073                 CRM_LOG_ASSERT(this_node != NULL);
4074                 continue;
4075 
4076             } else if (pe__is_guest_or_remote_node(this_node)) {
4077                 determine_remote_online_status(data_set, this_node);
4078 
4079             } else {
4080                 determine_online_status(node_state, this_node, data_set);
4081             }
4082 
4083             if (this_node->details->online
4084                 || pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
4085                 /* offline nodes run no resources...
4086                  * unless stonith is enabled in which case we need to
4087                  *   make sure rsc start events happen after the stonith
4088                  */
4089                 xmlNode *lrm_rsc = NULL;
4090 
4091                 tmp = find_xml_node(node_state, XML_CIB_TAG_LRM, FALSE);
4092                 tmp = find_xml_node(tmp, XML_LRM_TAG_RESOURCES, FALSE);
4093 
4094                 for (lrm_rsc = pcmk__xe_first_child(tmp); lrm_rsc != NULL;
4095                      lrm_rsc = pcmk__xe_next(lrm_rsc)) {
4096 
4097                     if (pcmk__str_eq((const char *)lrm_rsc->name,
4098                                      XML_LRM_TAG_RESOURCE, pcmk__str_none)) {
4099 
4100                         const char *rsc_id = crm_element_value(lrm_rsc, XML_ATTR_ID);
4101 
4102                         if (rsc != NULL && !pcmk__str_eq(rsc_id, rsc, pcmk__str_casei)) {
4103                             continue;
4104                         }
4105 
4106                         intermediate = extract_operations(uname, rsc_id, lrm_rsc, active_filter);
4107                         output = g_list_concat(output, intermediate);
4108                     }
4109                 }
4110             }
4111         }
4112     }
4113 
4114     return output;
4115 }

/* [previous][next][first][last][top][bottom][index][help] */