root/lib/pengine/unpack.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. is_dangling_guest_node
  2. pe_fence_node
  3. set_if_xpath
  4. unpack_config
  5. destroy_digest_cache
  6. pe_create_node
  7. expand_remote_rsc_meta
  8. handle_startup_fencing
  9. unpack_nodes
  10. setup_container
  11. unpack_remote_nodes
  12. link_rsc2remotenode
  13. destroy_tag
  14. unpack_resources
  15. unpack_tags
  16. unpack_ticket_state
  17. unpack_tickets_state
  18. unpack_handle_remote_attrs
  19. unpack_node_loop
  20. unpack_status
  21. determine_online_status_no_fencing
  22. determine_online_status_fencing
  23. determine_remote_online_status
  24. determine_online_status
  25. pe_base_name_end
  26. clone_strip
  27. clone_zero
  28. create_fake_resource
  29. create_anonymous_orphan
  30. find_anonymous_clone
  31. unpack_find_resource
  32. process_orphan_resource
  33. process_rsc_state
  34. process_recurring
  35. calculate_active_ops
  36. unpack_shutdown_lock
  37. unpack_lrm_rsc_state
  38. handle_orphaned_container_fillers
  39. unpack_lrm_resources
  40. set_active
  41. set_node_score
  42. find_lrm_op
  43. pe__call_id
  44. stop_happened_after
  45. unpack_migrate_to_success
  46. newer_op
  47. unpack_migrate_to_failure
  48. unpack_migrate_from_failure
  49. record_failed_op
  50. get_op_key
  51. last_change_str
  52. cmp_on_fail
  53. unpack_rsc_op_failure
  54. determine_op_status
  55. should_clear_for_param_change
  56. order_after_remote_fencing
  57. should_ignore_failure_timeout
  58. check_operation_expiry
  59. pe__target_rc_from_xml
  60. get_action_on_fail
  61. update_resource_state
  62. remap_monitor_rc
  63. unpack_rsc_op
  64. add_node_attrs
  65. extract_operations
  66. find_operations

   1 /*
   2  * Copyright 2004-2020 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU Lesser General Public License
   7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <stdio.h>
  13 #include <string.h>
  14 #include <glib.h>
  15 #include <time.h>
  16 
  17 #include <crm/crm.h>
  18 #include <crm/services.h>
  19 #include <crm/msg_xml.h>
  20 #include <crm/common/xml.h>
  21 #include <crm/common/xml_internal.h>
  22 
  23 #include <crm/common/util.h>
  24 #include <crm/pengine/rules.h>
  25 #include <crm/pengine/internal.h>
  26 #include <crm/common/iso8601_internal.h>
  27 #include <pe_status_private.h>
  28 
  29 CRM_TRACE_INIT_DATA(pe_status);
  30 
  31 /* This uses pcmk__set_flags_as()/pcmk__clear_flags_as() directly rather than
  32  * use pe__set_working_set_flags()/pe__clear_working_set_flags() so that the
  33  * flag is stringified more readably in log messages.
  34  */
  35 #define set_config_flag(data_set, option, flag) do {                        \
  36         const char *scf_value = pe_pref((data_set)->config_hash, (option)); \
  37         if (scf_value != NULL) {                                            \
  38             if (crm_is_true(scf_value)) {                                   \
  39                 (data_set)->flags = pcmk__set_flags_as(__func__, __LINE__,  \
  40                                     LOG_TRACE, "Working set",               \
  41                                     crm_system_name, (data_set)->flags,     \
  42                                     (flag), #flag);                         \
  43             } else {                                                        \
  44                 (data_set)->flags = pcmk__clear_flags_as(__func__, __LINE__,\
  45                                     LOG_TRACE, "Working set",               \
  46                                     crm_system_name, (data_set)->flags,     \
  47                                     (flag), #flag);                         \
  48             }                                                               \
  49         }                                                                   \
  50     } while(0)
  51 
  52 static void unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
  53                           xmlNode **last_failure,
  54                           enum action_fail_response *failed,
  55                           pe_working_set_t *data_set);
  56 static void determine_remote_online_status(pe_working_set_t *data_set,
  57                                            pe_node_t *this_node);
  58 static void add_node_attrs(xmlNode *attrs, pe_node_t *node, bool overwrite,
  59                            pe_working_set_t *data_set);
  60 static void determine_online_status(xmlNode *node_state, pe_node_t *this_node,
  61                                     pe_working_set_t *data_set);
  62 
  63 static void unpack_lrm_resources(pe_node_t *node, xmlNode *lrm_state,
  64                                  pe_working_set_t *data_set);
  65 
  66 
  67 // Bitmask for warnings we only want to print once
  68 uint32_t pe_wo = 0;
  69 
  70 static gboolean
  71 is_dangling_guest_node(pe_node_t *node)
     /* [previous][next][first][last][top][bottom][index][help] */
  72 {
  73     /* we are looking for a remote-node that was supposed to be mapped to a
  74      * container resource, but all traces of that container have disappeared 
  75      * from both the config and the status section. */
  76     if (pe__is_guest_or_remote_node(node) &&
  77         node->details->remote_rsc &&
  78         node->details->remote_rsc->container == NULL &&
  79         pcmk_is_set(node->details->remote_rsc->flags,
  80                     pe_rsc_orphan_container_filler)) {
  81         return TRUE;
  82     }
  83 
  84     return FALSE;
  85 }
  86 
  87 
  88 /*!
  89  * \brief Schedule a fence action for a node
  90  *
  91  * \param[in,out] data_set  Current working set of cluster
  92  * \param[in,out] node      Node to fence
  93  * \param[in]     reason    Text description of why fencing is needed
  94  * \param[in]     priority_delay  Whether to consider `priority-fencing-delay`
  95  */
  96 void
  97 pe_fence_node(pe_working_set_t * data_set, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
  98               const char *reason, bool priority_delay)
  99 {
 100     CRM_CHECK(node, return);
 101 
 102     /* A guest node is fenced by marking its container as failed */
 103     if (pe__is_guest_node(node)) {
 104         pe_resource_t *rsc = node->details->remote_rsc->container;
 105 
 106         if (!pcmk_is_set(rsc->flags, pe_rsc_failed)) {
 107             if (!pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 108                 crm_notice("Not fencing guest node %s "
 109                            "(otherwise would because %s): "
 110                            "its guest resource %s is unmanaged",
 111                            node->details->uname, reason, rsc->id);
 112             } else {
 113                 crm_warn("Guest node %s will be fenced "
 114                          "(by recovering its guest resource %s): %s",
 115                          node->details->uname, rsc->id, reason);
 116 
 117                 /* We don't mark the node as unclean because that would prevent the
 118                  * node from running resources. We want to allow it to run resources
 119                  * in this transition if the recovery succeeds.
 120                  */
 121                 node->details->remote_requires_reset = TRUE;
 122                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
 123             }
 124         }
 125 
 126     } else if (is_dangling_guest_node(node)) {
 127         crm_info("Cleaning up dangling connection for guest node %s: "
 128                  "fencing was already done because %s, "
 129                  "and guest resource no longer exists",
 130                  node->details->uname, reason);
 131         pe__set_resource_flags(node->details->remote_rsc,
 132                                pe_rsc_failed|pe_rsc_stop);
 133 
 134     } else if (pe__is_remote_node(node)) {
 135         pe_resource_t *rsc = node->details->remote_rsc;
 136 
 137         if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 138             crm_notice("Not fencing remote node %s "
 139                        "(otherwise would because %s): connection is unmanaged",
 140                        node->details->uname, reason);
 141         } else if(node->details->remote_requires_reset == FALSE) {
 142             node->details->remote_requires_reset = TRUE;
 143             crm_warn("Remote node %s %s: %s",
 144                      node->details->uname,
 145                      pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 146                      reason);
 147         }
 148         node->details->unclean = TRUE;
 149         // No need to apply `priority-fencing-delay` for remote nodes
 150         pe_fence_op(node, NULL, TRUE, reason, FALSE, data_set);
 151 
 152     } else if (node->details->unclean) {
 153         crm_trace("Cluster node %s %s because %s",
 154                   node->details->uname,
 155                   pe_can_fence(data_set, node)? "would also be fenced" : "also is unclean",
 156                   reason);
 157 
 158     } else {
 159         crm_warn("Cluster node %s %s: %s",
 160                  node->details->uname,
 161                  pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 162                  reason);
 163         node->details->unclean = TRUE;
 164         pe_fence_op(node, NULL, TRUE, reason, priority_delay, data_set);
 165     }
 166 }
 167 
 168 // @TODO xpaths can't handle templates, rules, or id-refs
 169 
 170 // nvpair with provides or requires set to unfencing
 171 #define XPATH_UNFENCING_NVPAIR XML_CIB_TAG_NVPAIR                \
 172     "[(@" XML_NVPAIR_ATTR_NAME "='" XML_RSC_ATTR_PROVIDES "'"    \
 173     "or @" XML_NVPAIR_ATTR_NAME "='" XML_RSC_ATTR_REQUIRES "') " \
 174     "and @" XML_NVPAIR_ATTR_VALUE "='unfencing']"
 175 
 176 // unfencing in rsc_defaults or any resource
 177 #define XPATH_ENABLE_UNFENCING \
 178     "/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RESOURCES   \
 179     "//" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR                                               \
 180     "|/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RSCCONFIG  \
 181     "/" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR
 182 
 183 static void
 184 set_if_xpath(uint64_t flag, const char *xpath, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 185 {
 186     xmlXPathObjectPtr result = NULL;
 187 
 188     if (!pcmk_is_set(data_set->flags, flag)) {
 189         result = xpath_search(data_set->input, xpath);
 190         if (result && (numXpathResults(result) > 0)) {
 191             pe__set_working_set_flags(data_set, flag);
 192         }
 193         freeXpathObject(result);
 194     }
 195 }
 196 
 197 gboolean
 198 unpack_config(xmlNode * config, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 199 {
 200     const char *value = NULL;
 201     GHashTable *config_hash = crm_str_table_new();
 202 
 203     pe_rule_eval_data_t rule_data = {
 204         .node_hash = NULL,
 205         .role = RSC_ROLE_UNKNOWN,
 206         .now = data_set->now,
 207         .match_data = NULL,
 208         .rsc_data = NULL,
 209         .op_data = NULL
 210     };
 211 
 212     data_set->config_hash = config_hash;
 213 
 214     pe__unpack_dataset_nvpairs(config, XML_CIB_TAG_PROPSET, &rule_data, config_hash,
 215                                CIB_OPTIONS_FIRST, FALSE, data_set);
 216 
 217     verify_pe_options(data_set->config_hash);
 218 
 219     set_config_flag(data_set, "enable-startup-probes", pe_flag_startup_probes);
 220     if (!pcmk_is_set(data_set->flags, pe_flag_startup_probes)) {
 221         crm_info("Startup probes: disabled (dangerous)");
 222     }
 223 
 224     value = pe_pref(data_set->config_hash, XML_ATTR_HAVE_WATCHDOG);
 225     if (value && crm_is_true(value)) {
 226         crm_info("Watchdog-based self-fencing will be performed via SBD if "
 227                  "fencing is required and stonith-watchdog-timeout is nonzero");
 228         pe__set_working_set_flags(data_set, pe_flag_have_stonith_resource);
 229     }
 230 
 231     /* Set certain flags via xpath here, so they can be used before the relevant
 232      * configuration sections are unpacked.
 233      */
 234     set_if_xpath(pe_flag_enable_unfencing, XPATH_ENABLE_UNFENCING, data_set);
 235 
 236     value = pe_pref(data_set->config_hash, "stonith-timeout");
 237     data_set->stonith_timeout = (int) crm_parse_interval_spec(value);
 238     crm_debug("STONITH timeout: %d", data_set->stonith_timeout);
 239 
 240     set_config_flag(data_set, "stonith-enabled", pe_flag_stonith_enabled);
 241     crm_debug("STONITH of failed nodes is %s",
 242               pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)? "enabled" : "disabled");
 243 
 244     data_set->stonith_action = pe_pref(data_set->config_hash, "stonith-action");
 245     if (!strcmp(data_set->stonith_action, "poweroff")) {
 246         pe_warn_once(pe_wo_poweroff,
 247                      "Support for stonith-action of 'poweroff' is deprecated "
 248                      "and will be removed in a future release (use 'off' instead)");
 249         data_set->stonith_action = "off";
 250     }
 251     crm_trace("STONITH will %s nodes", data_set->stonith_action);
 252 
 253     set_config_flag(data_set, "concurrent-fencing", pe_flag_concurrent_fencing);
 254     crm_debug("Concurrent fencing is %s",
 255               pcmk_is_set(data_set->flags, pe_flag_concurrent_fencing)? "enabled" : "disabled");
 256 
 257     value = pe_pref(data_set->config_hash,
 258                     XML_CONFIG_ATTR_PRIORITY_FENCING_DELAY);
 259     if (value) {
 260         data_set->priority_fencing_delay = crm_parse_interval_spec(value) / 1000;
 261         crm_trace("Priority fencing delay is %ds", data_set->priority_fencing_delay);
 262     }
 263 
 264     set_config_flag(data_set, "stop-all-resources", pe_flag_stop_everything);
 265     crm_debug("Stop all active resources: %s",
 266               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_stop_everything)));
 267 
 268     set_config_flag(data_set, "symmetric-cluster", pe_flag_symmetric_cluster);
 269     if (pcmk_is_set(data_set->flags, pe_flag_symmetric_cluster)) {
 270         crm_debug("Cluster is symmetric" " - resources can run anywhere by default");
 271     }
 272 
 273     value = pe_pref(data_set->config_hash, "no-quorum-policy");
 274 
 275     if (pcmk__str_eq(value, "ignore", pcmk__str_casei)) {
 276         data_set->no_quorum_policy = no_quorum_ignore;
 277 
 278     } else if (pcmk__str_eq(value, "freeze", pcmk__str_casei)) {
 279         data_set->no_quorum_policy = no_quorum_freeze;
 280 
 281     } else if (pcmk__str_eq(value, "demote", pcmk__str_casei)) {
 282         data_set->no_quorum_policy = no_quorum_demote;
 283 
 284     } else if (pcmk__str_eq(value, "suicide", pcmk__str_casei)) {
 285         if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 286             int do_panic = 0;
 287 
 288             crm_element_value_int(data_set->input, XML_ATTR_QUORUM_PANIC,
 289                                   &do_panic);
 290             if (do_panic || pcmk_is_set(data_set->flags, pe_flag_have_quorum)) {
 291                 data_set->no_quorum_policy = no_quorum_suicide;
 292             } else {
 293                 crm_notice("Resetting no-quorum-policy to 'stop': cluster has never had quorum");
 294                 data_set->no_quorum_policy = no_quorum_stop;
 295             }
 296         } else {
 297             pcmk__config_err("Resetting no-quorum-policy to 'stop' because "
 298                              "fencing is disabled");
 299             data_set->no_quorum_policy = no_quorum_stop;
 300         }
 301 
 302     } else {
 303         data_set->no_quorum_policy = no_quorum_stop;
 304     }
 305 
 306     switch (data_set->no_quorum_policy) {
 307         case no_quorum_freeze:
 308             crm_debug("On loss of quorum: Freeze resources");
 309             break;
 310         case no_quorum_stop:
 311             crm_debug("On loss of quorum: Stop ALL resources");
 312             break;
 313         case no_quorum_demote:
 314             crm_debug("On loss of quorum: "
 315                       "Demote promotable resources and stop other resources");
 316             break;
 317         case no_quorum_suicide:
 318             crm_notice("On loss of quorum: Fence all remaining nodes");
 319             break;
 320         case no_quorum_ignore:
 321             crm_notice("On loss of quorum: Ignore");
 322             break;
 323     }
 324 
 325     set_config_flag(data_set, "stop-orphan-resources", pe_flag_stop_rsc_orphans);
 326     crm_trace("Orphan resources are %s",
 327               pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)? "stopped" : "ignored");
 328 
 329     set_config_flag(data_set, "stop-orphan-actions", pe_flag_stop_action_orphans);
 330     crm_trace("Orphan resource actions are %s",
 331               pcmk_is_set(data_set->flags, pe_flag_stop_action_orphans)? "stopped" : "ignored");
 332 
 333     set_config_flag(data_set, "remove-after-stop", pe_flag_remove_after_stop);
 334     crm_trace("Stopped resources are removed from the status section: %s",
 335               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_remove_after_stop)));
 336 
 337     set_config_flag(data_set, "maintenance-mode", pe_flag_maintenance_mode);
 338     crm_trace("Maintenance mode: %s",
 339               pcmk__btoa(pcmk_is_set(data_set->flags, pe_flag_maintenance_mode)));
 340 
 341     set_config_flag(data_set, "start-failure-is-fatal", pe_flag_start_failure_fatal);
 342     crm_trace("Start failures are %s",
 343               pcmk_is_set(data_set->flags, pe_flag_start_failure_fatal)? "always fatal" : "handled by failcount");
 344 
 345     if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
 346         set_config_flag(data_set, "startup-fencing", pe_flag_startup_fencing);
 347     }
 348     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 349         crm_trace("Unseen nodes will be fenced");
 350     } else {
 351         pe_warn_once(pe_wo_blind, "Blind faith: not fencing unseen nodes");
 352     }
 353 
 354     pcmk__score_red = char2score(pe_pref(data_set->config_hash, "node-health-red"));
 355     pcmk__score_green = char2score(pe_pref(data_set->config_hash, "node-health-green"));
 356     pcmk__score_yellow = char2score(pe_pref(data_set->config_hash, "node-health-yellow"));
 357 
 358     crm_debug("Node scores: 'red' = %s, 'yellow' = %s, 'green' = %s",
 359              pe_pref(data_set->config_hash, "node-health-red"),
 360              pe_pref(data_set->config_hash, "node-health-yellow"),
 361              pe_pref(data_set->config_hash, "node-health-green"));
 362 
 363     data_set->placement_strategy = pe_pref(data_set->config_hash, "placement-strategy");
 364     crm_trace("Placement strategy: %s", data_set->placement_strategy);
 365 
 366     set_config_flag(data_set, "shutdown-lock", pe_flag_shutdown_lock);
 367     crm_trace("Resources will%s be locked to cleanly shut down nodes",
 368               (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)? "" : " not"));
 369     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
 370         value = pe_pref(data_set->config_hash,
 371                         XML_CONFIG_ATTR_SHUTDOWN_LOCK_LIMIT);
 372         data_set->shutdown_lock = crm_parse_interval_spec(value) / 1000;
 373         crm_trace("Shutdown locks expire after %us", data_set->shutdown_lock);
 374     }
 375 
 376     return TRUE;
 377 }
 378 
 379 static void
 380 destroy_digest_cache(gpointer ptr)
     /* [previous][next][first][last][top][bottom][index][help] */
 381 {
 382     op_digest_cache_t *data = ptr;
 383 
 384     free_xml(data->params_all);
 385     free_xml(data->params_secure);
 386     free_xml(data->params_restart);
 387 
 388     free(data->digest_all_calc);
 389     free(data->digest_restart_calc);
 390     free(data->digest_secure_calc);
 391 
 392     free(data);
 393 }
 394 
 395 pe_node_t *
 396 pe_create_node(const char *id, const char *uname, const char *type,
     /* [previous][next][first][last][top][bottom][index][help] */
 397                const char *score, pe_working_set_t * data_set)
 398 {
 399     pe_node_t *new_node = NULL;
 400 
 401     if (pe_find_node(data_set->nodes, uname) != NULL) {
 402         pcmk__config_warn("More than one node entry has name '%s'", uname);
 403     }
 404 
 405     new_node = calloc(1, sizeof(pe_node_t));
 406     if (new_node == NULL) {
 407         return NULL;
 408     }
 409 
 410     new_node->weight = char2score(score);
 411     new_node->fixed = FALSE;
 412     new_node->details = calloc(1, sizeof(struct pe_node_shared_s));
 413 
 414     if (new_node->details == NULL) {
 415         free(new_node);
 416         return NULL;
 417     }
 418 
 419     crm_trace("Creating node for entry %s/%s", uname, id);
 420     new_node->details->id = id;
 421     new_node->details->uname = uname;
 422     new_node->details->online = FALSE;
 423     new_node->details->shutdown = FALSE;
 424     new_node->details->rsc_discovery_enabled = TRUE;
 425     new_node->details->running_rsc = NULL;
 426     new_node->details->type = node_ping;
 427 
 428     if (pcmk__str_eq(type, "remote", pcmk__str_casei)) {
 429         new_node->details->type = node_remote;
 430         pe__set_working_set_flags(data_set, pe_flag_have_remote_nodes);
 431     } else if (pcmk__str_eq(type, "member", pcmk__str_null_matches | pcmk__str_casei)) {
 432         new_node->details->type = node_member;
 433     }
 434 
 435     new_node->details->attrs = crm_str_table_new();
 436 
 437     if (pe__is_guest_or_remote_node(new_node)) {
 438         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 439                             strdup("remote"));
 440     } else {
 441         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 442                             strdup("cluster"));
 443     }
 444 
 445     new_node->details->utilization = crm_str_table_new();
 446 
 447     new_node->details->digest_cache = g_hash_table_new_full(crm_str_hash,
 448                                                             g_str_equal, free,
 449                                                             destroy_digest_cache);
 450 
 451     data_set->nodes = g_list_insert_sorted(data_set->nodes, new_node, sort_node_uname);
 452     return new_node;
 453 }
 454 
 455 static const char *
 456 expand_remote_rsc_meta(xmlNode *xml_obj, xmlNode *parent, pe_working_set_t *data)
     /* [previous][next][first][last][top][bottom][index][help] */
 457 {
 458     xmlNode *attr_set = NULL;
 459     xmlNode *attr = NULL;
 460 
 461     const char *container_id = ID(xml_obj);
 462     const char *remote_name = NULL;
 463     const char *remote_server = NULL;
 464     const char *remote_port = NULL;
 465     const char *connect_timeout = "60s";
 466     const char *remote_allow_migrate=NULL;
 467     const char *is_managed = NULL;
 468 
 469     for (attr_set = pcmk__xe_first_child(xml_obj); attr_set != NULL;
 470          attr_set = pcmk__xe_next(attr_set)) {
 471 
 472         if (!pcmk__str_eq((const char *)attr_set->name, XML_TAG_META_SETS,
 473                           pcmk__str_casei)) {
 474             continue;
 475         }
 476 
 477         for (attr = pcmk__xe_first_child(attr_set); attr != NULL;
 478              attr = pcmk__xe_next(attr)) {
 479             const char *value = crm_element_value(attr, XML_NVPAIR_ATTR_VALUE);
 480             const char *name = crm_element_value(attr, XML_NVPAIR_ATTR_NAME);
 481 
 482             if (pcmk__str_eq(name, XML_RSC_ATTR_REMOTE_NODE, pcmk__str_casei)) {
 483                 remote_name = value;
 484             } else if (pcmk__str_eq(name, "remote-addr", pcmk__str_casei)) {
 485                 remote_server = value;
 486             } else if (pcmk__str_eq(name, "remote-port", pcmk__str_casei)) {
 487                 remote_port = value;
 488             } else if (pcmk__str_eq(name, "remote-connect-timeout", pcmk__str_casei)) {
 489                 connect_timeout = value;
 490             } else if (pcmk__str_eq(name, "remote-allow-migrate", pcmk__str_casei)) {
 491                 remote_allow_migrate=value;
 492             } else if (pcmk__str_eq(name, XML_RSC_ATTR_MANAGED, pcmk__str_casei)) {
 493                 is_managed = value;
 494             }
 495         }
 496     }
 497 
 498     if (remote_name == NULL) {
 499         return NULL;
 500     }
 501 
 502     if (pe_find_resource(data->resources, remote_name) != NULL) {
 503         return NULL;
 504     }
 505 
 506     pe_create_remote_xml(parent, remote_name, container_id,
 507                          remote_allow_migrate, is_managed,
 508                          connect_timeout, remote_server, remote_port);
 509     return remote_name;
 510 }
 511 
 512 static void
 513 handle_startup_fencing(pe_working_set_t *data_set, pe_node_t *new_node)
     /* [previous][next][first][last][top][bottom][index][help] */
 514 {
 515     if ((new_node->details->type == node_remote) && (new_node->details->remote_rsc == NULL)) {
 516         /* Ignore fencing for remote nodes that don't have a connection resource
 517          * associated with them. This happens when remote node entries get left
 518          * in the nodes section after the connection resource is removed.
 519          */
 520         return;
 521     }
 522 
 523     if (pcmk_is_set(data_set->flags, pe_flag_startup_fencing)) {
 524         // All nodes are unclean until we've seen their status entry
 525         new_node->details->unclean = TRUE;
 526 
 527     } else {
 528         // Blind faith ...
 529         new_node->details->unclean = FALSE;
 530     }
 531 
 532     /* We need to be able to determine if a node's status section
 533      * exists or not separate from whether the node is unclean. */
 534     new_node->details->unseen = TRUE;
 535 }
 536 
 537 gboolean
 538 unpack_nodes(xmlNode * xml_nodes, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 539 {
 540     xmlNode *xml_obj = NULL;
 541     pe_node_t *new_node = NULL;
 542     const char *id = NULL;
 543     const char *uname = NULL;
 544     const char *type = NULL;
 545     const char *score = NULL;
 546 
 547     pe_rule_eval_data_t rule_data = {
 548         .node_hash = NULL,
 549         .role = RSC_ROLE_UNKNOWN,
 550         .now = data_set->now,
 551         .match_data = NULL,
 552         .rsc_data = NULL,
 553         .op_data = NULL
 554     };
 555 
 556     for (xml_obj = pcmk__xe_first_child(xml_nodes); xml_obj != NULL;
 557          xml_obj = pcmk__xe_next(xml_obj)) {
 558 
 559         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_NODE, pcmk__str_none)) {
 560             new_node = NULL;
 561 
 562             id = crm_element_value(xml_obj, XML_ATTR_ID);
 563             uname = crm_element_value(xml_obj, XML_ATTR_UNAME);
 564             type = crm_element_value(xml_obj, XML_ATTR_TYPE);
 565             score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
 566             crm_trace("Processing node %s/%s", uname, id);
 567 
 568             if (id == NULL) {
 569                 pcmk__config_err("Ignoring <" XML_CIB_TAG_NODE
 570                                  "> entry in configuration without id");
 571                 continue;
 572             }
 573             new_node = pe_create_node(id, uname, type, score, data_set);
 574 
 575             if (new_node == NULL) {
 576                 return FALSE;
 577             }
 578 
 579 /*              if(data_set->have_quorum == FALSE */
 580 /*                 && data_set->no_quorum_policy == no_quorum_stop) { */
 581 /*                      /\* start shutting resources down *\/ */
 582 /*                      new_node->weight = -INFINITY; */
 583 /*              } */
 584 
 585             handle_startup_fencing(data_set, new_node);
 586 
 587             add_node_attrs(xml_obj, new_node, FALSE, data_set);
 588             pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_UTILIZATION, &rule_data,
 589                                        new_node->details->utilization, NULL,
 590                                        FALSE, data_set);
 591 
 592             crm_trace("Done with node %s", crm_element_value(xml_obj, XML_ATTR_UNAME));
 593         }
 594     }
 595 
 596     if (data_set->localhost && pe_find_node(data_set->nodes, data_set->localhost) == NULL) {
 597         crm_info("Creating a fake local node");
 598         pe_create_node(data_set->localhost, data_set->localhost, NULL, 0,
 599                        data_set);
 600     }
 601 
 602     return TRUE;
 603 }
 604 
 605 static void
 606 setup_container(pe_resource_t * rsc, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 607 {
 608     const char *container_id = NULL;
 609 
 610     if (rsc->children) {
 611         GListPtr gIter = rsc->children;
 612 
 613         for (; gIter != NULL; gIter = gIter->next) {
 614             pe_resource_t *child_rsc = (pe_resource_t *) gIter->data;
 615 
 616             setup_container(child_rsc, data_set);
 617         }
 618         return;
 619     }
 620 
 621     container_id = g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_CONTAINER);
 622     if (container_id && !pcmk__str_eq(container_id, rsc->id, pcmk__str_casei)) {
 623         pe_resource_t *container = pe_find_resource(data_set->resources, container_id);
 624 
 625         if (container) {
 626             rsc->container = container;
 627             pe__set_resource_flags(container, pe_rsc_is_container);
 628             container->fillers = g_list_append(container->fillers, rsc);
 629             pe_rsc_trace(rsc, "Resource %s's container is %s", rsc->id, container_id);
 630         } else {
 631             pe_err("Resource %s: Unknown resource container (%s)", rsc->id, container_id);
 632         }
 633     }
 634 }
 635 
 636 gboolean
 637 unpack_remote_nodes(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 638 {
 639     xmlNode *xml_obj = NULL;
 640 
 641     /* Create remote nodes and guest nodes from the resource configuration
 642      * before unpacking resources.
 643      */
 644     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 645          xml_obj = pcmk__xe_next(xml_obj)) {
 646 
 647         const char *new_node_id = NULL;
 648 
 649         /* Check for remote nodes, which are defined by ocf:pacemaker:remote
 650          * primitives.
 651          */
 652         if (xml_contains_remote_node(xml_obj)) {
 653             new_node_id = ID(xml_obj);
 654             /* The "pe_find_node" check is here to make sure we don't iterate over
 655              * an expanded node that has already been added to the node list. */
 656             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 657                 crm_trace("Found remote node %s defined by resource %s",
 658                           new_node_id, ID(xml_obj));
 659                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 660                                data_set);
 661             }
 662             continue;
 663         }
 664 
 665         /* Check for guest nodes, which are defined by special meta-attributes
 666          * of a primitive of any type (for example, VirtualDomain or Xen).
 667          */
 668         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_RESOURCE, pcmk__str_none)) {
 669             /* This will add an ocf:pacemaker:remote primitive to the
 670              * configuration for the guest node's connection, to be unpacked
 671              * later.
 672              */
 673             new_node_id = expand_remote_rsc_meta(xml_obj, xml_resources, data_set);
 674             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 675                 crm_trace("Found guest node %s in resource %s",
 676                           new_node_id, ID(xml_obj));
 677                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 678                                data_set);
 679             }
 680             continue;
 681         }
 682 
 683         /* Check for guest nodes inside a group. Clones are currently not
 684          * supported as guest nodes.
 685          */
 686         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_GROUP, pcmk__str_none)) {
 687             xmlNode *xml_obj2 = NULL;
 688             for (xml_obj2 = pcmk__xe_first_child(xml_obj); xml_obj2 != NULL;
 689                  xml_obj2 = pcmk__xe_next(xml_obj2)) {
 690 
 691                 new_node_id = expand_remote_rsc_meta(xml_obj2, xml_resources, data_set);
 692 
 693                 if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 694                     crm_trace("Found guest node %s in resource %s inside group %s",
 695                               new_node_id, ID(xml_obj2), ID(xml_obj));
 696                     pe_create_node(new_node_id, new_node_id, "remote", NULL,
 697                                    data_set);
 698                 }
 699             }
 700         }
 701     }
 702     return TRUE;
 703 }
 704 
 705 /* Call this after all the nodes and resources have been
 706  * unpacked, but before the status section is read.
 707  *
 708  * A remote node's online status is reflected by the state
 709  * of the remote node's connection resource. We need to link
 710  * the remote node to this connection resource so we can have
 711  * easy access to the connection resource during the scheduler calculations.
 712  */
 713 static void
 714 link_rsc2remotenode(pe_working_set_t *data_set, pe_resource_t *new_rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
 715 {
 716     pe_node_t *remote_node = NULL;
 717 
 718     if (new_rsc->is_remote_node == FALSE) {
 719         return;
 720     }
 721 
 722     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 723         /* remote_nodes and remote_resources are not linked in quick location calculations */
 724         return;
 725     }
 726 
 727     remote_node = pe_find_node(data_set->nodes, new_rsc->id);
 728     CRM_CHECK(remote_node != NULL, return;);
 729 
 730     pe_rsc_trace(new_rsc, "Linking remote connection resource %s to node %s",
 731                  new_rsc->id, remote_node->details->uname);
 732     remote_node->details->remote_rsc = new_rsc;
 733 
 734     if (new_rsc->container == NULL) {
 735         /* Handle start-up fencing for remote nodes (as opposed to guest nodes)
 736          * the same as is done for cluster nodes.
 737          */
 738         handle_startup_fencing(data_set, remote_node);
 739 
 740     } else {
 741         /* pe_create_node() marks the new node as "remote" or "cluster"; now
 742          * that we know the node is a guest node, update it correctly.
 743          */
 744         g_hash_table_replace(remote_node->details->attrs, strdup(CRM_ATTR_KIND),
 745                              strdup("container"));
 746     }
 747 }
 748 
 749 static void
 750 destroy_tag(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 751 {
 752     pe_tag_t *tag = data;
 753 
 754     if (tag) {
 755         free(tag->id);
 756         g_list_free_full(tag->refs, free);
 757         free(tag);
 758     }
 759 }
 760 
 761 /*!
 762  * \internal
 763  * \brief Parse configuration XML for resource information
 764  *
 765  * \param[in]     xml_resources  Top of resource configuration XML
 766  * \param[in,out] data_set       Where to put resource information
 767  *
 768  * \return TRUE
 769  *
 770  * \note unpack_remote_nodes() MUST be called before this, so that the nodes can
 771  *       be used when common_unpack() calls resource_location()
 772  */
 773 gboolean
 774 unpack_resources(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 775 {
 776     xmlNode *xml_obj = NULL;
 777     GListPtr gIter = NULL;
 778 
 779     data_set->template_rsc_sets = g_hash_table_new_full(crm_str_hash,
 780                                                         g_str_equal, free,
 781                                                         destroy_tag);
 782 
 783     for (xml_obj = pcmk__xe_first_child(xml_resources); xml_obj != NULL;
 784          xml_obj = pcmk__xe_next(xml_obj)) {
 785 
 786         pe_resource_t *new_rsc = NULL;
 787 
 788         if (pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_RSC_TEMPLATE, pcmk__str_none)) {
 789             const char *template_id = ID(xml_obj);
 790 
 791             if (template_id && g_hash_table_lookup_extended(data_set->template_rsc_sets,
 792                                                             template_id, NULL, NULL) == FALSE) {
 793                 /* Record the template's ID for the knowledge of its existence anyway. */
 794                 g_hash_table_insert(data_set->template_rsc_sets, strdup(template_id), NULL);
 795             }
 796             continue;
 797         }
 798 
 799         crm_trace("Beginning unpack... <%s id=%s... >", crm_element_name(xml_obj), ID(xml_obj));
 800         if (common_unpack(xml_obj, &new_rsc, NULL, data_set) && (new_rsc != NULL)) {
 801             data_set->resources = g_list_append(data_set->resources, new_rsc);
 802             pe_rsc_trace(new_rsc, "Added resource %s", new_rsc->id);
 803 
 804         } else {
 805             pcmk__config_err("Ignoring <%s> resource '%s' "
 806                              "because configuration is invalid",
 807                              crm_element_name(xml_obj), crm_str(ID(xml_obj)));
 808             if (new_rsc != NULL && new_rsc->fns != NULL) {
 809                 new_rsc->fns->free(new_rsc);
 810             }
 811         }
 812     }
 813 
 814     for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) {
 815         pe_resource_t *rsc = (pe_resource_t *) gIter->data;
 816 
 817         setup_container(rsc, data_set);
 818         link_rsc2remotenode(data_set, rsc);
 819     }
 820 
 821     data_set->resources = g_list_sort(data_set->resources, sort_rsc_priority);
 822     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
 823         /* Ignore */
 824 
 825     } else if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
 826                && !pcmk_is_set(data_set->flags, pe_flag_have_stonith_resource)) {
 827 
 828         pcmk__config_err("Resource start-up disabled since no STONITH resources have been defined");
 829         pcmk__config_err("Either configure some or disable STONITH with the stonith-enabled option");
 830         pcmk__config_err("NOTE: Clusters with shared data need STONITH to ensure data integrity");
 831     }
 832 
 833     return TRUE;
 834 }
 835 
 836 gboolean
 837 unpack_tags(xmlNode * xml_tags, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 838 {
 839     xmlNode *xml_tag = NULL;
 840 
 841     data_set->tags = g_hash_table_new_full(crm_str_hash, g_str_equal, free,
 842                                            destroy_tag);
 843 
 844     for (xml_tag = pcmk__xe_first_child(xml_tags); xml_tag != NULL;
 845          xml_tag = pcmk__xe_next(xml_tag)) {
 846 
 847         xmlNode *xml_obj_ref = NULL;
 848         const char *tag_id = ID(xml_tag);
 849 
 850         if (!pcmk__str_eq((const char *)xml_tag->name, XML_CIB_TAG_TAG, pcmk__str_none)) {
 851             continue;
 852         }
 853 
 854         if (tag_id == NULL) {
 855             pcmk__config_err("Ignoring <%s> without " XML_ATTR_ID,
 856                              crm_element_name(xml_tag));
 857             continue;
 858         }
 859 
 860         for (xml_obj_ref = pcmk__xe_first_child(xml_tag); xml_obj_ref != NULL;
 861              xml_obj_ref = pcmk__xe_next(xml_obj_ref)) {
 862 
 863             const char *obj_ref = ID(xml_obj_ref);
 864 
 865             if (!pcmk__str_eq((const char *)xml_obj_ref->name, XML_CIB_TAG_OBJ_REF, pcmk__str_none)) {
 866                 continue;
 867             }
 868 
 869             if (obj_ref == NULL) {
 870                 pcmk__config_err("Ignoring <%s> for tag '%s' without " XML_ATTR_ID,
 871                                  crm_element_name(xml_obj_ref), tag_id);
 872                 continue;
 873             }
 874 
 875             if (add_tag_ref(data_set->tags, tag_id, obj_ref) == FALSE) {
 876                 return FALSE;
 877             }
 878         }
 879     }
 880 
 881     return TRUE;
 882 }
 883 
 884 /* The ticket state section:
 885  * "/cib/status/tickets/ticket_state" */
 886 static gboolean
 887 unpack_ticket_state(xmlNode * xml_ticket, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 888 {
 889     const char *ticket_id = NULL;
 890     const char *granted = NULL;
 891     const char *last_granted = NULL;
 892     const char *standby = NULL;
 893     xmlAttrPtr xIter = NULL;
 894 
 895     pe_ticket_t *ticket = NULL;
 896 
 897     ticket_id = ID(xml_ticket);
 898     if (pcmk__str_empty(ticket_id)) {
 899         return FALSE;
 900     }
 901 
 902     crm_trace("Processing ticket state for %s", ticket_id);
 903 
 904     ticket = g_hash_table_lookup(data_set->tickets, ticket_id);
 905     if (ticket == NULL) {
 906         ticket = ticket_new(ticket_id, data_set);
 907         if (ticket == NULL) {
 908             return FALSE;
 909         }
 910     }
 911 
 912     for (xIter = xml_ticket->properties; xIter; xIter = xIter->next) {
 913         const char *prop_name = (const char *)xIter->name;
 914         const char *prop_value = crm_element_value(xml_ticket, prop_name);
 915 
 916         if (pcmk__str_eq(prop_name, XML_ATTR_ID, pcmk__str_none)) {
 917             continue;
 918         }
 919         g_hash_table_replace(ticket->state, strdup(prop_name), strdup(prop_value));
 920     }
 921 
 922     granted = g_hash_table_lookup(ticket->state, "granted");
 923     if (granted && crm_is_true(granted)) {
 924         ticket->granted = TRUE;
 925         crm_info("We have ticket '%s'", ticket->id);
 926     } else {
 927         ticket->granted = FALSE;
 928         crm_info("We do not have ticket '%s'", ticket->id);
 929     }
 930 
 931     last_granted = g_hash_table_lookup(ticket->state, "last-granted");
 932     if (last_granted) {
 933         ticket->last_granted = crm_parse_int(last_granted, 0);
 934     }
 935 
 936     standby = g_hash_table_lookup(ticket->state, "standby");
 937     if (standby && crm_is_true(standby)) {
 938         ticket->standby = TRUE;
 939         if (ticket->granted) {
 940             crm_info("Granted ticket '%s' is in standby-mode", ticket->id);
 941         }
 942     } else {
 943         ticket->standby = FALSE;
 944     }
 945 
 946     crm_trace("Done with ticket state for %s", ticket_id);
 947 
 948     return TRUE;
 949 }
 950 
 951 static gboolean
 952 unpack_tickets_state(xmlNode * xml_tickets, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 953 {
 954     xmlNode *xml_obj = NULL;
 955 
 956     for (xml_obj = pcmk__xe_first_child(xml_tickets); xml_obj != NULL;
 957          xml_obj = pcmk__xe_next(xml_obj)) {
 958 
 959         if (!pcmk__str_eq((const char *)xml_obj->name, XML_CIB_TAG_TICKET_STATE, pcmk__str_none)) {
 960             continue;
 961         }
 962         unpack_ticket_state(xml_obj, data_set);
 963     }
 964 
 965     return TRUE;
 966 }
 967 
 968 static void
 969 unpack_handle_remote_attrs(pe_node_t *this_node, xmlNode *state, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
 970 {
 971     const char *resource_discovery_enabled = NULL;
 972     xmlNode *attrs = NULL;
 973     pe_resource_t *rsc = NULL;
 974 
 975     if (!pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
 976         return;
 977     }
 978 
 979     if ((this_node == NULL) || !pe__is_guest_or_remote_node(this_node)) {
 980         return;
 981     }
 982     crm_trace("Processing remote node id=%s, uname=%s", this_node->details->id, this_node->details->uname);
 983 
 984     this_node->details->remote_maintenance =
 985         crm_atoi(crm_element_value(state, XML_NODE_IS_MAINTENANCE), "0");
 986 
 987     rsc = this_node->details->remote_rsc;
 988     if (this_node->details->remote_requires_reset == FALSE) {
 989         this_node->details->unclean = FALSE;
 990         this_node->details->unseen = FALSE;
 991     }
 992     attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
 993     add_node_attrs(attrs, this_node, TRUE, data_set);
 994 
 995     if (pe__shutdown_requested(this_node)) {
 996         crm_info("Node %s is shutting down", this_node->details->uname);
 997         this_node->details->shutdown = TRUE;
 998         if (rsc) {
 999             rsc->next_role = RSC_ROLE_STOPPED;
1000         }
1001     }
1002  
1003     if (crm_is_true(pe_node_attribute_raw(this_node, "standby"))) {
1004         crm_info("Node %s is in standby-mode", this_node->details->uname);
1005         this_node->details->standby = TRUE;
1006     }
1007 
1008     if (crm_is_true(pe_node_attribute_raw(this_node, "maintenance")) ||
1009         ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_managed))) {
1010         crm_info("Node %s is in maintenance-mode", this_node->details->uname);
1011         this_node->details->maintenance = TRUE;
1012     }
1013 
1014     resource_discovery_enabled = pe_node_attribute_raw(this_node, XML_NODE_ATTR_RSC_DISCOVERY);
1015     if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
1016         if (pe__is_remote_node(this_node)
1017             && !pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1018             crm_warn("Ignoring %s attribute on remote node %s because stonith is disabled",
1019                      XML_NODE_ATTR_RSC_DISCOVERY, this_node->details->uname);
1020         } else {
1021             /* This is either a remote node with fencing enabled, or a guest
1022              * node. We don't care whether fencing is enabled when fencing guest
1023              * nodes, because they are "fenced" by recovering their containing
1024              * resource.
1025              */
1026             crm_info("Node %s has resource discovery disabled", this_node->details->uname);
1027             this_node->details->rsc_discovery_enabled = FALSE;
1028         }
1029     }
1030 }
1031 
1032 static bool
1033 unpack_node_loop(xmlNode * status, bool fence, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
1034 {
1035     bool changed = false;
1036     xmlNode *lrm_rsc = NULL;
1037 
1038     for (xmlNode *state = pcmk__xe_first_child(status); state != NULL;
1039          state = pcmk__xe_next(state)) {
1040 
1041         const char *id = NULL;
1042         const char *uname = NULL;
1043         pe_node_t *this_node = NULL;
1044         bool process = FALSE;
1045 
1046         if (!pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
1047             continue;
1048         }
1049 
1050         id = crm_element_value(state, XML_ATTR_ID);
1051         uname = crm_element_value(state, XML_ATTR_UNAME);
1052         this_node = pe_find_node_any(data_set->nodes, id, uname);
1053 
1054         if (this_node == NULL) {
1055             crm_info("Node %s is unknown", id);
1056             continue;
1057 
1058         } else if (this_node->details->unpacked) {
1059             crm_trace("Node %s was already processed", id);
1060             continue;
1061 
1062         } else if (!pe__is_guest_or_remote_node(this_node)
1063                    && pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1064             // A redundant test, but preserves the order for regression tests
1065             process = TRUE;
1066 
1067         } else if (pe__is_guest_or_remote_node(this_node)) {
1068             bool check = FALSE;
1069             pe_resource_t *rsc = this_node->details->remote_rsc;
1070 
1071             if(fence) {
1072                 check = TRUE;
1073 
1074             } else if(rsc == NULL) {
1075                 /* Not ready yet */
1076 
1077             } else if (pe__is_guest_node(this_node)
1078                        && rsc->role == RSC_ROLE_STARTED
1079                        && rsc->container->role == RSC_ROLE_STARTED) {
1080                 /* Both the connection and its containing resource need to be
1081                  * known to be up before we process resources running in it.
1082                  */
1083                 check = TRUE;
1084                 crm_trace("Checking node %s/%s/%s status %d/%d/%d", id, rsc->id, rsc->container->id, fence, rsc->role, RSC_ROLE_STARTED);
1085 
1086             } else if (!pe__is_guest_node(this_node)
1087                        && ((rsc->role == RSC_ROLE_STARTED)
1088                            || pcmk_is_set(data_set->flags, pe_flag_shutdown_lock))) {
1089                 check = TRUE;
1090                 crm_trace("Checking node %s/%s status %d/%d/%d", id, rsc->id, fence, rsc->role, RSC_ROLE_STARTED);
1091             }
1092 
1093             if (check) {
1094                 determine_remote_online_status(data_set, this_node);
1095                 unpack_handle_remote_attrs(this_node, state, data_set);
1096                 process = TRUE;
1097             }
1098 
1099         } else if (this_node->details->online) {
1100             process = TRUE;
1101 
1102         } else if (fence) {
1103             process = TRUE;
1104 
1105         } else if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
1106             process = TRUE;
1107         }
1108 
1109         if(process) {
1110             crm_trace("Processing lrm resource entries on %shealthy%s node: %s",
1111                       fence?"un":"",
1112                       (pe__is_guest_or_remote_node(this_node)? " remote" : ""),
1113                       this_node->details->uname);
1114             changed = TRUE;
1115             this_node->details->unpacked = TRUE;
1116 
1117             lrm_rsc = find_xml_node(state, XML_CIB_TAG_LRM, FALSE);
1118             lrm_rsc = find_xml_node(lrm_rsc, XML_LRM_TAG_RESOURCES, FALSE);
1119             unpack_lrm_resources(this_node, lrm_rsc, data_set);
1120         }
1121     }
1122     return changed;
1123 }
1124 
1125 /* remove nodes that are down, stopping */
1126 /* create positive rsc_to_node constraints between resources and the nodes they are running on */
1127 /* anything else? */
1128 gboolean
1129 unpack_status(xmlNode * status, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1130 {
1131     const char *id = NULL;
1132     const char *uname = NULL;
1133 
1134     xmlNode *state = NULL;
1135     pe_node_t *this_node = NULL;
1136 
1137     crm_trace("Beginning unpack");
1138 
1139     if (data_set->tickets == NULL) {
1140         data_set->tickets = g_hash_table_new_full(crm_str_hash, g_str_equal,
1141                                                   free, destroy_ticket);
1142     }
1143 
1144     for (state = pcmk__xe_first_child(status); state != NULL;
1145          state = pcmk__xe_next(state)) {
1146 
1147         if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_TICKETS, pcmk__str_none)) {
1148             unpack_tickets_state((xmlNode *) state, data_set);
1149 
1150         } else if (pcmk__str_eq((const char *)state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
1151             xmlNode *attrs = NULL;
1152             const char *resource_discovery_enabled = NULL;
1153 
1154             id = crm_element_value(state, XML_ATTR_ID);
1155             uname = crm_element_value(state, XML_ATTR_UNAME);
1156             this_node = pe_find_node_any(data_set->nodes, id, uname);
1157 
1158             if (uname == NULL) {
1159                 /* error */
1160                 continue;
1161 
1162             } else if (this_node == NULL) {
1163                 pcmk__config_warn("Ignoring recorded node status for '%s' "
1164                                   "because no longer in configuration", uname);
1165                 continue;
1166 
1167             } else if (pe__is_guest_or_remote_node(this_node)) {
1168                 /* online state for remote nodes is determined by the
1169                  * rsc state after all the unpacking is done. we do however
1170                  * need to mark whether or not the node has been fenced as this plays
1171                  * a role during unpacking cluster node resource state */
1172                 this_node->details->remote_was_fenced = 
1173                     crm_atoi(crm_element_value(state, XML_NODE_IS_FENCED), "0");
1174                 continue;
1175             }
1176 
1177             crm_trace("Processing node id=%s, uname=%s", id, uname);
1178 
1179             /* Mark the node as provisionally clean
1180              * - at least we have seen it in the current cluster's lifetime
1181              */
1182             this_node->details->unclean = FALSE;
1183             this_node->details->unseen = FALSE;
1184             attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
1185             add_node_attrs(attrs, this_node, TRUE, data_set);
1186 
1187             if (crm_is_true(pe_node_attribute_raw(this_node, "standby"))) {
1188                 crm_info("Node %s is in standby-mode", this_node->details->uname);
1189                 this_node->details->standby = TRUE;
1190             }
1191 
1192             if (crm_is_true(pe_node_attribute_raw(this_node, "maintenance"))) {
1193                 crm_info("Node %s is in maintenance-mode", this_node->details->uname);
1194                 this_node->details->maintenance = TRUE;
1195             }
1196 
1197             resource_discovery_enabled = pe_node_attribute_raw(this_node, XML_NODE_ATTR_RSC_DISCOVERY);
1198             if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
1199                 crm_warn("ignoring %s attribute on node %s, disabling resource discovery is not allowed on cluster nodes",
1200                     XML_NODE_ATTR_RSC_DISCOVERY, this_node->details->uname);
1201             }
1202 
1203             crm_trace("determining node state");
1204             determine_online_status(state, this_node, data_set);
1205 
1206             if (!pcmk_is_set(data_set->flags, pe_flag_have_quorum)
1207                 && this_node->details->online
1208                 && (data_set->no_quorum_policy == no_quorum_suicide)) {
1209                 /* Everything else should flow from this automatically
1210                  * (at least until the scheduler becomes able to migrate off
1211                  * healthy resources)
1212                  */
1213                 pe_fence_node(data_set, this_node, "cluster does not have quorum", FALSE);
1214             }
1215         }
1216     }
1217 
1218 
1219     while(unpack_node_loop(status, FALSE, data_set)) {
1220         crm_trace("Start another loop");
1221     }
1222 
1223     // Now catch any nodes we didn't see
1224     unpack_node_loop(status,
1225                      pcmk_is_set(data_set->flags, pe_flag_stonith_enabled),
1226                      data_set);
1227 
1228     /* Now that we know where resources are, we can schedule stops of containers
1229      * with failed bundle connections
1230      */
1231     if (data_set->stop_needed != NULL) {
1232         for (GList *item = data_set->stop_needed; item; item = item->next) {
1233             pe_resource_t *container = item->data;
1234             pe_node_t *node = pe__current_node(container);
1235 
1236             if (node) {
1237                 stop_action(container, node, FALSE);
1238             }
1239         }
1240         g_list_free(data_set->stop_needed);
1241         data_set->stop_needed = NULL;
1242     }
1243 
1244     for (GListPtr gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) {
1245         pe_node_t *this_node = gIter->data;
1246 
1247         if (this_node == NULL) {
1248             continue;
1249         } else if (!pe__is_guest_or_remote_node(this_node)) {
1250             continue;
1251         } else if(this_node->details->unpacked) {
1252             continue;
1253         }
1254         determine_remote_online_status(data_set, this_node);
1255     }
1256 
1257     return TRUE;
1258 }
1259 
1260 static gboolean
1261 determine_online_status_no_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1262                                    pe_node_t * this_node)
1263 {
1264     gboolean online = FALSE;
1265     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1266     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1267     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1268     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1269 
1270     if (!crm_is_true(in_cluster)) {
1271         crm_trace("Node is down: in_cluster=%s", crm_str(in_cluster));
1272 
1273     } else if (pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei)) {
1274         if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1275             online = TRUE;
1276         } else {
1277             crm_debug("Node is not ready to run resources: %s", join);
1278         }
1279 
1280     } else if (this_node->details->expected_up == FALSE) {
1281         crm_trace("Controller is down: in_cluster=%s", crm_str(in_cluster));
1282         crm_trace("\tis_peer=%s, join=%s, expected=%s",
1283                   crm_str(is_peer), crm_str(join), crm_str(exp_state));
1284 
1285     } else {
1286         /* mark it unclean */
1287         pe_fence_node(data_set, this_node, "peer is unexpectedly down", FALSE);
1288         crm_info("\tin_cluster=%s, is_peer=%s, join=%s, expected=%s",
1289                  crm_str(in_cluster), crm_str(is_peer), crm_str(join), crm_str(exp_state));
1290     }
1291     return online;
1292 }
1293 
1294 static gboolean
1295 determine_online_status_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1296                                 pe_node_t * this_node)
1297 {
1298     gboolean online = FALSE;
1299     gboolean do_terminate = FALSE;
1300     bool crmd_online = FALSE;
1301     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1302     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1303     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1304     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1305     const char *terminate = pe_node_attribute_raw(this_node, "terminate");
1306 
1307 /*
1308   - XML_NODE_IN_CLUSTER    ::= true|false
1309   - XML_NODE_IS_PEER       ::= online|offline
1310   - XML_NODE_JOIN_STATE    ::= member|down|pending|banned
1311   - XML_NODE_EXPECTED      ::= member|down
1312 */
1313 
1314     if (crm_is_true(terminate)) {
1315         do_terminate = TRUE;
1316 
1317     } else if (terminate != NULL && strlen(terminate) > 0) {
1318         /* could be a time() value */
1319         char t = terminate[0];
1320 
1321         if (t != '0' && isdigit(t)) {
1322             do_terminate = TRUE;
1323         }
1324     }
1325 
1326     crm_trace("%s: in_cluster=%s, is_peer=%s, join=%s, expected=%s, term=%d",
1327               this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
1328               crm_str(join), crm_str(exp_state), do_terminate);
1329 
1330     online = crm_is_true(in_cluster);
1331     crmd_online = pcmk__str_eq(is_peer, ONLINESTATUS, pcmk__str_casei);
1332     if (exp_state == NULL) {
1333         exp_state = CRMD_JOINSTATE_DOWN;
1334     }
1335 
1336     if (this_node->details->shutdown) {
1337         crm_debug("%s is shutting down", this_node->details->uname);
1338 
1339         /* Slightly different criteria since we can't shut down a dead peer */
1340         online = crmd_online;
1341 
1342     } else if (in_cluster == NULL) {
1343         pe_fence_node(data_set, this_node, "peer has not been seen by the cluster", FALSE);
1344 
1345     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_NACK, pcmk__str_casei)) {
1346         pe_fence_node(data_set, this_node, "peer failed the pacemaker membership criteria", FALSE);
1347 
1348     } else if (do_terminate == FALSE && pcmk__str_eq(exp_state, CRMD_JOINSTATE_DOWN, pcmk__str_casei)) {
1349 
1350         if (crm_is_true(in_cluster) || crmd_online) {
1351             crm_info("- Node %s is not ready to run resources", this_node->details->uname);
1352             this_node->details->standby = TRUE;
1353             this_node->details->pending = TRUE;
1354 
1355         } else {
1356             crm_trace("%s is down or still coming up", this_node->details->uname);
1357         }
1358 
1359     } else if (do_terminate && pcmk__str_eq(join, CRMD_JOINSTATE_DOWN, pcmk__str_casei)
1360                && crm_is_true(in_cluster) == FALSE && !crmd_online) {
1361         crm_info("Node %s was just shot", this_node->details->uname);
1362         online = FALSE;
1363 
1364     } else if (crm_is_true(in_cluster) == FALSE) {
1365         // Consider `priority-fencing-delay` for lost nodes
1366         pe_fence_node(data_set, this_node, "peer is no longer part of the cluster", TRUE);
1367 
1368     } else if (!crmd_online) {
1369         pe_fence_node(data_set, this_node, "peer process is no longer available", FALSE);
1370 
1371         /* Everything is running at this point, now check join state */
1372     } else if (do_terminate) {
1373         pe_fence_node(data_set, this_node, "termination was requested", FALSE);
1374 
1375     } else if (pcmk__str_eq(join, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1376         crm_info("Node %s is active", this_node->details->uname);
1377 
1378     } else if (pcmk__strcase_any_of(join, CRMD_JOINSTATE_PENDING, CRMD_JOINSTATE_DOWN, NULL)) {
1379         crm_info("Node %s is not ready to run resources", this_node->details->uname);
1380         this_node->details->standby = TRUE;
1381         this_node->details->pending = TRUE;
1382 
1383     } else {
1384         pe_fence_node(data_set, this_node, "peer was in an unknown state", FALSE);
1385         crm_warn("%s: in-cluster=%s, is-peer=%s, join=%s, expected=%s, term=%d, shutdown=%d",
1386                  this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
1387                  crm_str(join), crm_str(exp_state), do_terminate, this_node->details->shutdown);
1388     }
1389 
1390     return online;
1391 }
1392 
1393 static void
1394 determine_remote_online_status(pe_working_set_t * data_set, pe_node_t * this_node)
     /* [previous][next][first][last][top][bottom][index][help] */
1395 {
1396     pe_resource_t *rsc = this_node->details->remote_rsc;
1397     pe_resource_t *container = NULL;
1398     pe_node_t *host = NULL;
1399 
1400     /* If there is a node state entry for a (former) Pacemaker Remote node
1401      * but no resource creating that node, the node's connection resource will
1402      * be NULL. Consider it an offline remote node in that case.
1403      */
1404     if (rsc == NULL) {
1405         this_node->details->online = FALSE;
1406         goto remote_online_done;
1407     }
1408 
1409     container = rsc->container;
1410 
1411     if (container && pcmk__list_of_1(rsc->running_on)) {
1412         host = rsc->running_on->data;
1413     }
1414 
1415     /* If the resource is currently started, mark it online. */
1416     if (rsc->role == RSC_ROLE_STARTED) {
1417         crm_trace("%s node %s presumed ONLINE because connection resource is started",
1418                   (container? "Guest" : "Remote"), this_node->details->id);
1419         this_node->details->online = TRUE;
1420     }
1421 
1422     /* consider this node shutting down if transitioning start->stop */
1423     if (rsc->role == RSC_ROLE_STARTED && rsc->next_role == RSC_ROLE_STOPPED) {
1424         crm_trace("%s node %s shutting down because connection resource is stopping",
1425                   (container? "Guest" : "Remote"), this_node->details->id);
1426         this_node->details->shutdown = TRUE;
1427     }
1428 
1429     /* Now check all the failure conditions. */
1430     if(container && pcmk_is_set(container->flags, pe_rsc_failed)) {
1431         crm_trace("Guest node %s UNCLEAN because guest resource failed",
1432                   this_node->details->id);
1433         this_node->details->online = FALSE;
1434         this_node->details->remote_requires_reset = TRUE;
1435 
1436     } else if (pcmk_is_set(rsc->flags, pe_rsc_failed)) {
1437         crm_trace("%s node %s OFFLINE because connection resource failed",
1438                   (container? "Guest" : "Remote"), this_node->details->id);
1439         this_node->details->online = FALSE;
1440 
1441     } else if (rsc->role == RSC_ROLE_STOPPED
1442         || (container && container->role == RSC_ROLE_STOPPED)) {
1443 
1444         crm_trace("%s node %s OFFLINE because its resource is stopped",
1445                   (container? "Guest" : "Remote"), this_node->details->id);
1446         this_node->details->online = FALSE;
1447         this_node->details->remote_requires_reset = FALSE;
1448 
1449     } else if (host && (host->details->online == FALSE)
1450                && host->details->unclean) {
1451         crm_trace("Guest node %s UNCLEAN because host is unclean",
1452                   this_node->details->id);
1453         this_node->details->online = FALSE;
1454         this_node->details->remote_requires_reset = TRUE;
1455     }
1456 
1457 remote_online_done:
1458     crm_trace("Remote node %s online=%s",
1459         this_node->details->id, this_node->details->online ? "TRUE" : "FALSE");
1460 }
1461 
1462 static void
1463 determine_online_status(xmlNode * node_state, pe_node_t * this_node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1464 {
1465     gboolean online = FALSE;
1466     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1467 
1468     CRM_CHECK(this_node != NULL, return);
1469 
1470     this_node->details->shutdown = FALSE;
1471     this_node->details->expected_up = FALSE;
1472 
1473     if (pe__shutdown_requested(this_node)) {
1474         this_node->details->shutdown = TRUE;
1475 
1476     } else if (pcmk__str_eq(exp_state, CRMD_JOINSTATE_MEMBER, pcmk__str_casei)) {
1477         this_node->details->expected_up = TRUE;
1478     }
1479 
1480     if (this_node->details->type == node_ping) {
1481         this_node->details->unclean = FALSE;
1482         online = FALSE;         /* As far as resource management is concerned,
1483                                  * the node is safely offline.
1484                                  * Anyone caught abusing this logic will be shot
1485                                  */
1486 
1487     } else if (!pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1488         online = determine_online_status_no_fencing(data_set, node_state, this_node);
1489 
1490     } else {
1491         online = determine_online_status_fencing(data_set, node_state, this_node);
1492     }
1493 
1494     if (online) {
1495         this_node->details->online = TRUE;
1496 
1497     } else {
1498         /* remove node from contention */
1499         this_node->fixed = TRUE;
1500         this_node->weight = -INFINITY;
1501     }
1502 
1503     if (online && this_node->details->shutdown) {
1504         /* don't run resources here */
1505         this_node->fixed = TRUE;
1506         this_node->weight = -INFINITY;
1507     }
1508 
1509     if (this_node->details->type == node_ping) {
1510         crm_info("Node %s is not a pacemaker node", this_node->details->uname);
1511 
1512     } else if (this_node->details->unclean) {
1513         pe_proc_warn("Node %s is unclean", this_node->details->uname);
1514 
1515     } else if (this_node->details->online) {
1516         crm_info("Node %s is %s", this_node->details->uname,
1517                  this_node->details->shutdown ? "shutting down" :
1518                  this_node->details->pending ? "pending" :
1519                  this_node->details->standby ? "standby" :
1520                  this_node->details->maintenance ? "maintenance" : "online");
1521 
1522     } else {
1523         crm_trace("Node %s is offline", this_node->details->uname);
1524     }
1525 }
1526 
1527 /*!
1528  * \internal
1529  * \brief Find the end of a resource's name, excluding any clone suffix
1530  *
1531  * \param[in] id  Resource ID to check
1532  *
1533  * \return Pointer to last character of resource's base name
1534  */
1535 const char *
1536 pe_base_name_end(const char *id)
     /* [previous][next][first][last][top][bottom][index][help] */
1537 {
1538     if (!pcmk__str_empty(id)) {
1539         const char *end = id + strlen(id) - 1;
1540 
1541         for (const char *s = end; s > id; --s) {
1542             switch (*s) {
1543                 case '0':
1544                 case '1':
1545                 case '2':
1546                 case '3':
1547                 case '4':
1548                 case '5':
1549                 case '6':
1550                 case '7':
1551                 case '8':
1552                 case '9':
1553                     break;
1554                 case ':':
1555                     return (s == end)? s : (s - 1);
1556                 default:
1557                     return end;
1558             }
1559         }
1560         return end;
1561     }
1562     return NULL;
1563 }
1564 
1565 /*!
1566  * \internal
1567  * \brief Get a resource name excluding any clone suffix
1568  *
1569  * \param[in] last_rsc_id  Resource ID to check
1570  *
1571  * \return Pointer to newly allocated string with resource's base name
1572  * \note It is the caller's responsibility to free() the result.
1573  *       This asserts on error, so callers can assume result is not NULL.
1574  */
1575 char *
1576 clone_strip(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1577 {
1578     const char *end = pe_base_name_end(last_rsc_id);
1579     char *basename = NULL;
1580 
1581     CRM_ASSERT(end);
1582     basename = strndup(last_rsc_id, end - last_rsc_id + 1);
1583     CRM_ASSERT(basename);
1584     return basename;
1585 }
1586 
1587 /*!
1588  * \internal
1589  * \brief Get the name of the first instance of a cloned resource
1590  *
1591  * \param[in] last_rsc_id  Resource ID to check
1592  *
1593  * \return Pointer to newly allocated string with resource's base name plus :0
1594  * \note It is the caller's responsibility to free() the result.
1595  *       This asserts on error, so callers can assume result is not NULL.
1596  */
1597 char *
1598 clone_zero(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1599 {
1600     const char *end = pe_base_name_end(last_rsc_id);
1601     size_t base_name_len = end - last_rsc_id + 1;
1602     char *zero = NULL;
1603 
1604     CRM_ASSERT(end);
1605     zero = calloc(base_name_len + 3, sizeof(char));
1606     CRM_ASSERT(zero);
1607     memcpy(zero, last_rsc_id, base_name_len);
1608     zero[base_name_len] = ':';
1609     zero[base_name_len + 1] = '0';
1610     return zero;
1611 }
1612 
1613 static pe_resource_t *
1614 create_fake_resource(const char *rsc_id, xmlNode * rsc_entry, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1615 {
1616     pe_resource_t *rsc = NULL;
1617     xmlNode *xml_rsc = create_xml_node(NULL, XML_CIB_TAG_RESOURCE);
1618 
1619     copy_in_properties(xml_rsc, rsc_entry);
1620     crm_xml_add(xml_rsc, XML_ATTR_ID, rsc_id);
1621     crm_log_xml_debug(xml_rsc, "Orphan resource");
1622 
1623     if (!common_unpack(xml_rsc, &rsc, NULL, data_set)) {
1624         return NULL;
1625     }
1626 
1627     if (xml_contains_remote_node(xml_rsc)) {
1628         pe_node_t *node;
1629 
1630         crm_debug("Detected orphaned remote node %s", rsc_id);
1631         node = pe_find_node(data_set->nodes, rsc_id);
1632         if (node == NULL) {
1633                 node = pe_create_node(rsc_id, rsc_id, "remote", NULL, data_set);
1634         }
1635         link_rsc2remotenode(data_set, rsc);
1636 
1637         if (node) {
1638             crm_trace("Setting node %s as shutting down due to orphaned connection resource", rsc_id);
1639             node->details->shutdown = TRUE;
1640         }
1641     }
1642 
1643     if (crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER)) {
1644         /* This orphaned rsc needs to be mapped to a container. */
1645         crm_trace("Detected orphaned container filler %s", rsc_id);
1646         pe__set_resource_flags(rsc, pe_rsc_orphan_container_filler);
1647     }
1648     pe__set_resource_flags(rsc, pe_rsc_orphan);
1649     data_set->resources = g_list_append(data_set->resources, rsc);
1650     return rsc;
1651 }
1652 
1653 /*!
1654  * \internal
1655  * \brief Create orphan instance for anonymous clone resource history
1656  */
1657 static pe_resource_t *
1658 create_anonymous_orphan(pe_resource_t *parent, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1659                         pe_node_t *node, pe_working_set_t *data_set)
1660 {
1661     pe_resource_t *top = pe__create_clone_child(parent, data_set);
1662 
1663     // find_rsc() because we might be a cloned group
1664     pe_resource_t *orphan = top->fns->find_rsc(top, rsc_id, NULL, pe_find_clone);
1665 
1666     pe_rsc_debug(parent, "Created orphan %s for %s: %s on %s",
1667                  top->id, parent->id, rsc_id, node->details->uname);
1668     return orphan;
1669 }
1670 
1671 /*!
1672  * \internal
1673  * \brief Check a node for an instance of an anonymous clone
1674  *
1675  * Return a child instance of the specified anonymous clone, in order of
1676  * preference: (1) the instance running on the specified node, if any;
1677  * (2) an inactive instance (i.e. within the total of clone-max instances);
1678  * (3) a newly created orphan (i.e. clone-max instances are already active).
1679  *
1680  * \param[in] data_set  Cluster information
1681  * \param[in] node      Node on which to check for instance
1682  * \param[in] parent    Clone to check
1683  * \param[in] rsc_id    Name of cloned resource in history (without instance)
1684  */
1685 static pe_resource_t *
1686 find_anonymous_clone(pe_working_set_t * data_set, pe_node_t * node, pe_resource_t * parent,
     /* [previous][next][first][last][top][bottom][index][help] */
1687                      const char *rsc_id)
1688 {
1689     GListPtr rIter = NULL;
1690     pe_resource_t *rsc = NULL;
1691     pe_resource_t *inactive_instance = NULL;
1692     gboolean skip_inactive = FALSE;
1693 
1694     CRM_ASSERT(parent != NULL);
1695     CRM_ASSERT(pe_rsc_is_clone(parent));
1696     CRM_ASSERT(!pcmk_is_set(parent->flags, pe_rsc_unique));
1697 
1698     // Check for active (or partially active, for cloned groups) instance
1699     pe_rsc_trace(parent, "Looking for %s on %s in %s", rsc_id, node->details->uname, parent->id);
1700     for (rIter = parent->children; rsc == NULL && rIter; rIter = rIter->next) {
1701         GListPtr locations = NULL;
1702         pe_resource_t *child = rIter->data;
1703 
1704         /* Check whether this instance is already known to be active or pending
1705          * anywhere, at this stage of unpacking. Because this function is called
1706          * for a resource before the resource's individual operation history
1707          * entries are unpacked, locations will generally not contain the
1708          * desired node.
1709          *
1710          * However, there are three exceptions:
1711          * (1) when child is a cloned group and we have already unpacked the
1712          *     history of another member of the group on the same node;
1713          * (2) when we've already unpacked the history of another numbered
1714          *     instance on the same node (which can happen if globally-unique
1715          *     was flipped from true to false); and
1716          * (3) when we re-run calculations on the same data set as part of a
1717          *     simulation.
1718          */
1719         child->fns->location(child, &locations, 2);
1720         if (locations) {
1721             /* We should never associate the same numbered anonymous clone
1722              * instance with multiple nodes, and clone instances can't migrate,
1723              * so there must be only one location, regardless of history.
1724              */
1725             CRM_LOG_ASSERT(locations->next == NULL);
1726 
1727             if (((pe_node_t *)locations->data)->details == node->details) {
1728                 /* This child instance is active on the requested node, so check
1729                  * for a corresponding configured resource. We use find_rsc()
1730                  * instead of child because child may be a cloned group, and we
1731                  * need the particular member corresponding to rsc_id.
1732                  *
1733                  * If the history entry is orphaned, rsc will be NULL.
1734                  */
1735                 rsc = parent->fns->find_rsc(child, rsc_id, NULL, pe_find_clone);
1736                 if (rsc) {
1737                     /* If there are multiple instance history entries for an
1738                      * anonymous clone in a single node's history (which can
1739                      * happen if globally-unique is switched from true to
1740                      * false), we want to consider the instances beyond the
1741                      * first as orphans, even if there are inactive instance
1742                      * numbers available.
1743                      */
1744                     if (rsc->running_on) {
1745                         crm_notice("Active (now-)anonymous clone %s has "
1746                                    "multiple (orphan) instance histories on %s",
1747                                    parent->id, node->details->uname);
1748                         skip_inactive = TRUE;
1749                         rsc = NULL;
1750                     } else {
1751                         pe_rsc_trace(parent, "Resource %s, active", rsc->id);
1752                     }
1753                 }
1754             }
1755             g_list_free(locations);
1756 
1757         } else {
1758             pe_rsc_trace(parent, "Resource %s, skip inactive", child->id);
1759             if (!skip_inactive && !inactive_instance
1760                 && !pcmk_is_set(child->flags, pe_rsc_block)) {
1761                 // Remember one inactive instance in case we don't find active
1762                 inactive_instance = parent->fns->find_rsc(child, rsc_id, NULL,
1763                                                           pe_find_clone);
1764 
1765                 /* ... but don't use it if it was already associated with a
1766                  * pending action on another node
1767                  */
1768                 if (inactive_instance && inactive_instance->pending_node
1769                     && (inactive_instance->pending_node->details != node->details)) {
1770                     inactive_instance = NULL;
1771                 }
1772             }
1773         }
1774     }
1775 
1776     if ((rsc == NULL) && !skip_inactive && (inactive_instance != NULL)) {
1777         pe_rsc_trace(parent, "Resource %s, empty slot", inactive_instance->id);
1778         rsc = inactive_instance;
1779     }
1780 
1781     /* If the resource has "requires" set to "quorum" or "nothing", and we don't
1782      * have a clone instance for every node, we don't want to consume a valid
1783      * instance number for unclean nodes. Such instances may appear to be active
1784      * according to the history, but should be considered inactive, so we can
1785      * start an instance elsewhere. Treat such instances as orphans.
1786      *
1787      * An exception is instances running on guest nodes -- since guest node
1788      * "fencing" is actually just a resource stop, requires shouldn't apply.
1789      *
1790      * @TODO Ideally, we'd use an inactive instance number if it is not needed
1791      * for any clean instances. However, we don't know that at this point.
1792      */
1793     if ((rsc != NULL) && !pcmk_is_set(rsc->flags, pe_rsc_needs_fencing)
1794         && (!node->details->online || node->details->unclean)
1795         && !pe__is_guest_node(node)
1796         && !pe__is_universal_clone(parent, data_set)) {
1797 
1798         rsc = NULL;
1799     }
1800 
1801     if (rsc == NULL) {
1802         rsc = create_anonymous_orphan(parent, rsc_id, node, data_set);
1803         pe_rsc_trace(parent, "Resource %s, orphan", rsc->id);
1804     }
1805     return rsc;
1806 }
1807 
1808 static pe_resource_t *
1809 unpack_find_resource(pe_working_set_t * data_set, pe_node_t * node, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1810                      xmlNode * rsc_entry)
1811 {
1812     pe_resource_t *rsc = NULL;
1813     pe_resource_t *parent = NULL;
1814 
1815     crm_trace("looking for %s", rsc_id);
1816     rsc = pe_find_resource(data_set->resources, rsc_id);
1817 
1818     if (rsc == NULL) {
1819         /* If we didn't find the resource by its name in the operation history,
1820          * check it again as a clone instance. Even when clone-max=0, we create
1821          * a single :0 orphan to match against here.
1822          */
1823         char *clone0_id = clone_zero(rsc_id);
1824         pe_resource_t *clone0 = pe_find_resource(data_set->resources, clone0_id);
1825 
1826         if (clone0 && !pcmk_is_set(clone0->flags, pe_rsc_unique)) {
1827             rsc = clone0;
1828             parent = uber_parent(clone0);
1829             crm_trace("%s found as %s (%s)", rsc_id, clone0_id, parent->id);
1830         } else {
1831             crm_trace("%s is not known as %s either (orphan)",
1832                       rsc_id, clone0_id);
1833         }
1834         free(clone0_id);
1835 
1836     } else if (rsc->variant > pe_native) {
1837         crm_trace("Resource history for %s is orphaned because it is no longer primitive",
1838                   rsc_id);
1839         return NULL;
1840 
1841     } else {
1842         parent = uber_parent(rsc);
1843     }
1844 
1845     if (pe_rsc_is_anon_clone(parent)) {
1846 
1847         if (pe_rsc_is_bundled(parent)) {
1848             rsc = pe__find_bundle_replica(parent->parent, node);
1849         } else {
1850             char *base = clone_strip(rsc_id);
1851 
1852             rsc = find_anonymous_clone(data_set, node, parent, base);
1853             free(base);
1854             CRM_ASSERT(rsc != NULL);
1855         }
1856     }
1857 
1858     if (rsc && !pcmk__str_eq(rsc_id, rsc->id, pcmk__str_casei)
1859         && !pcmk__str_eq(rsc_id, rsc->clone_name, pcmk__str_casei)) {
1860 
1861         free(rsc->clone_name);
1862         rsc->clone_name = strdup(rsc_id);
1863         pe_rsc_debug(rsc, "Internally renamed %s on %s to %s%s",
1864                      rsc_id, node->details->uname, rsc->id,
1865                      (pcmk_is_set(rsc->flags, pe_rsc_orphan)? " (ORPHAN)" : ""));
1866     }
1867     return rsc;
1868 }
1869 
1870 static pe_resource_t *
1871 process_orphan_resource(xmlNode * rsc_entry, pe_node_t * node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1872 {
1873     pe_resource_t *rsc = NULL;
1874     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
1875 
1876     crm_debug("Detected orphan resource %s on %s", rsc_id, node->details->uname);
1877     rsc = create_fake_resource(rsc_id, rsc_entry, data_set);
1878 
1879     if (!pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)) {
1880         pe__clear_resource_flags(rsc, pe_rsc_managed);
1881 
1882     } else {
1883         CRM_CHECK(rsc != NULL, return NULL);
1884         pe_rsc_trace(rsc, "Added orphan %s", rsc->id);
1885         resource_location(rsc, NULL, -INFINITY, "__orphan_do_not_run__", data_set);
1886     }
1887     return rsc;
1888 }
1889 
1890 static void
1891 process_rsc_state(pe_resource_t * rsc, pe_node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
1892                   enum action_fail_response on_fail,
1893                   xmlNode * migrate_op, pe_working_set_t * data_set)
1894 {
1895     pe_node_t *tmpnode = NULL;
1896     char *reason = NULL;
1897 
1898     CRM_ASSERT(rsc);
1899     pe_rsc_trace(rsc, "Resource %s is %s on %s: on_fail=%s",
1900                  rsc->id, role2text(rsc->role), node->details->uname, fail2text(on_fail));
1901 
1902     /* process current state */
1903     if (rsc->role != RSC_ROLE_UNKNOWN) {
1904         pe_resource_t *iter = rsc;
1905 
1906         while (iter) {
1907             if (g_hash_table_lookup(iter->known_on, node->details->id) == NULL) {
1908                 pe_node_t *n = pe__copy_node(node);
1909 
1910                 pe_rsc_trace(rsc, "%s (aka. %s) known on %s", rsc->id, rsc->clone_name,
1911                              n->details->uname);
1912                 g_hash_table_insert(iter->known_on, (gpointer) n->details->id, n);
1913             }
1914             if (pcmk_is_set(iter->flags, pe_rsc_unique)) {
1915                 break;
1916             }
1917             iter = iter->parent;
1918         }
1919     }
1920 
1921     /* If a managed resource is believed to be running, but node is down ... */
1922     if (rsc->role > RSC_ROLE_STOPPED
1923         && node->details->online == FALSE
1924         && node->details->maintenance == FALSE
1925         && pcmk_is_set(rsc->flags, pe_rsc_managed)) {
1926 
1927         gboolean should_fence = FALSE;
1928 
1929         /* If this is a guest node, fence it (regardless of whether fencing is
1930          * enabled, because guest node fencing is done by recovery of the
1931          * container resource rather than by the fencer). Mark the resource
1932          * we're processing as failed. When the guest comes back up, its
1933          * operation history in the CIB will be cleared, freeing the affected
1934          * resource to run again once we are sure we know its state.
1935          */
1936         if (pe__is_guest_node(node)) {
1937             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
1938             should_fence = TRUE;
1939 
1940         } else if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
1941             if (pe__is_remote_node(node) && node->details->remote_rsc
1942                 && !pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_failed)) {
1943 
1944                 /* Setting unseen means that fencing of the remote node will
1945                  * occur only if the connection resource is not going to start
1946                  * somewhere. This allows connection resources on a failed
1947                  * cluster node to move to another node without requiring the
1948                  * remote nodes to be fenced as well.
1949                  */
1950                 node->details->unseen = TRUE;
1951                 reason = crm_strdup_printf("%s is active there (fencing will be"
1952                                            " revoked if remote connection can "
1953                                            "be re-established elsewhere)",
1954                                            rsc->id);
1955             }
1956             should_fence = TRUE;
1957         }
1958 
1959         if (should_fence) {
1960             if (reason == NULL) {
1961                reason = crm_strdup_printf("%s is thought to be active there", rsc->id);
1962             }
1963             pe_fence_node(data_set, node, reason, FALSE);
1964         }
1965         free(reason);
1966     }
1967 
1968     if (node->details->unclean) {
1969         /* No extra processing needed
1970          * Also allows resources to be started again after a node is shot
1971          */
1972         on_fail = action_fail_ignore;
1973     }
1974 
1975     switch (on_fail) {
1976         case action_fail_ignore:
1977             /* nothing to do */
1978             break;
1979 
1980         case action_fail_demote:
1981             pe__set_resource_flags(rsc, pe_rsc_failed);
1982             demote_action(rsc, node, FALSE);
1983             break;
1984 
1985         case action_fail_fence:
1986             /* treat it as if it is still running
1987              * but also mark the node as unclean
1988              */
1989             reason = crm_strdup_printf("%s failed there", rsc->id);
1990             pe_fence_node(data_set, node, reason, FALSE);
1991             free(reason);
1992             break;
1993 
1994         case action_fail_standby:
1995             node->details->standby = TRUE;
1996             node->details->standby_onfail = TRUE;
1997             break;
1998 
1999         case action_fail_block:
2000             /* is_managed == FALSE will prevent any
2001              * actions being sent for the resource
2002              */
2003             pe__clear_resource_flags(rsc, pe_rsc_managed);
2004             pe__set_resource_flags(rsc, pe_rsc_block);
2005             break;
2006 
2007         case action_fail_migrate:
2008             /* make sure it comes up somewhere else
2009              * or not at all
2010              */
2011             resource_location(rsc, node, -INFINITY, "__action_migration_auto__", data_set);
2012             break;
2013 
2014         case action_fail_stop:
2015             rsc->next_role = RSC_ROLE_STOPPED;
2016             break;
2017 
2018         case action_fail_recover:
2019             if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2020                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2021                 stop_action(rsc, node, FALSE);
2022             }
2023             break;
2024 
2025         case action_fail_restart_container:
2026             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2027             if (rsc->container && pe_rsc_is_bundled(rsc)) {
2028                 /* A bundle's remote connection can run on a different node than
2029                  * the bundle's container. We don't necessarily know where the
2030                  * container is running yet, so remember it and add a stop
2031                  * action for it later.
2032                  */
2033                 data_set->stop_needed = g_list_prepend(data_set->stop_needed,
2034                                                        rsc->container);
2035             } else if (rsc->container) {
2036                 stop_action(rsc->container, node, FALSE);
2037             } else if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2038                 stop_action(rsc, node, FALSE);
2039             }
2040             break;
2041 
2042         case action_fail_reset_remote:
2043             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2044             if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
2045                 tmpnode = NULL;
2046                 if (rsc->is_remote_node) {
2047                     tmpnode = pe_find_node(data_set->nodes, rsc->id);
2048                 }
2049                 if (tmpnode &&
2050                     pe__is_remote_node(tmpnode) &&
2051                     tmpnode->details->remote_was_fenced == 0) {
2052 
2053                     /* The remote connection resource failed in a way that
2054                      * should result in fencing the remote node.
2055                      */
2056                     pe_fence_node(data_set, tmpnode,
2057                                   "remote connection is unrecoverable", FALSE);
2058                 }
2059             }
2060 
2061             /* require the stop action regardless if fencing is occurring or not. */
2062             if (rsc->role > RSC_ROLE_STOPPED) {
2063                 stop_action(rsc, node, FALSE);
2064             }
2065 
2066             /* if reconnect delay is in use, prevent the connection from exiting the
2067              * "STOPPED" role until the failure is cleared by the delay timeout. */
2068             if (rsc->remote_reconnect_ms) {
2069                 rsc->next_role = RSC_ROLE_STOPPED;
2070             }
2071             break;
2072     }
2073 
2074     /* ensure a remote-node connection failure forces an unclean remote-node
2075      * to be fenced. By setting unseen = FALSE, the remote-node failure will
2076      * result in a fencing operation regardless if we're going to attempt to 
2077      * reconnect to the remote-node in this transition or not. */
2078     if (pcmk_is_set(rsc->flags, pe_rsc_failed) && rsc->is_remote_node) {
2079         tmpnode = pe_find_node(data_set->nodes, rsc->id);
2080         if (tmpnode && tmpnode->details->unclean) {
2081             tmpnode->details->unseen = FALSE;
2082         }
2083     }
2084 
2085     if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2086         if (pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
2087             if (pcmk_is_set(rsc->flags, pe_rsc_managed)) {
2088                 pcmk__config_warn("Detected active orphan %s running on %s",
2089                                   rsc->id, node->details->uname);
2090             } else {
2091                 pcmk__config_warn("Resource '%s' must be stopped manually on "
2092                                   "%s because cluster is configured not to "
2093                                   "stop active orphans",
2094                                   rsc->id, node->details->uname);
2095             }
2096         }
2097 
2098         native_add_running(rsc, node, data_set);
2099         switch (on_fail) {
2100             case action_fail_ignore:
2101                 break;
2102             case action_fail_demote:
2103             case action_fail_block:
2104                 pe__set_resource_flags(rsc, pe_rsc_failed);
2105                 break;
2106             default:
2107                 pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2108                 break;
2109         }
2110 
2111     } else if (rsc->clone_name && strchr(rsc->clone_name, ':') != NULL) {
2112         /* Only do this for older status sections that included instance numbers
2113          * Otherwise stopped instances will appear as orphans
2114          */
2115         pe_rsc_trace(rsc, "Resetting clone_name %s for %s (stopped)", rsc->clone_name, rsc->id);
2116         free(rsc->clone_name);
2117         rsc->clone_name = NULL;
2118 
2119     } else {
2120         GList *possible_matches = pe__resource_actions(rsc, node, RSC_STOP,
2121                                                        FALSE);
2122         GListPtr gIter = possible_matches;
2123 
2124         for (; gIter != NULL; gIter = gIter->next) {
2125             pe_action_t *stop = (pe_action_t *) gIter->data;
2126 
2127             pe__set_action_flags(stop, pe_action_optional);
2128         }
2129 
2130         g_list_free(possible_matches);
2131     }
2132 }
2133 
2134 /* create active recurring operations as optional */
2135 static void
2136 process_recurring(pe_node_t * node, pe_resource_t * rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
2137                   int start_index, int stop_index,
2138                   GListPtr sorted_op_list, pe_working_set_t * data_set)
2139 {
2140     int counter = -1;
2141     const char *task = NULL;
2142     const char *status = NULL;
2143     GListPtr gIter = sorted_op_list;
2144 
2145     CRM_ASSERT(rsc);
2146     pe_rsc_trace(rsc, "%s: Start index %d, stop index = %d", rsc->id, start_index, stop_index);
2147 
2148     for (; gIter != NULL; gIter = gIter->next) {
2149         xmlNode *rsc_op = (xmlNode *) gIter->data;
2150 
2151         guint interval_ms = 0;
2152         char *key = NULL;
2153         const char *id = ID(rsc_op);
2154 
2155         counter++;
2156 
2157         if (node->details->online == FALSE) {
2158             pe_rsc_trace(rsc, "Skipping %s/%s: node is offline", rsc->id, node->details->uname);
2159             break;
2160 
2161             /* Need to check if there's a monitor for role="Stopped" */
2162         } else if (start_index < stop_index && counter <= stop_index) {
2163             pe_rsc_trace(rsc, "Skipping %s/%s: resource is not active", id, node->details->uname);
2164             continue;
2165 
2166         } else if (counter < start_index) {
2167             pe_rsc_trace(rsc, "Skipping %s/%s: old %d", id, node->details->uname, counter);
2168             continue;
2169         }
2170 
2171         crm_element_value_ms(rsc_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
2172         if (interval_ms == 0) {
2173             pe_rsc_trace(rsc, "Skipping %s/%s: non-recurring", id, node->details->uname);
2174             continue;
2175         }
2176 
2177         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2178         if (pcmk__str_eq(status, "-1", pcmk__str_casei)) {
2179             pe_rsc_trace(rsc, "Skipping %s/%s: status", id, node->details->uname);
2180             continue;
2181         }
2182         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2183         /* create the action */
2184         key = pcmk__op_key(rsc->id, task, interval_ms);
2185         pe_rsc_trace(rsc, "Creating %s/%s", key, node->details->uname);
2186         custom_action(rsc, key, task, node, TRUE, TRUE, data_set);
2187     }
2188 }
2189 
2190 void
2191 calculate_active_ops(GListPtr sorted_op_list, int *start_index, int *stop_index)
     /* [previous][next][first][last][top][bottom][index][help] */
2192 {
2193     int counter = -1;
2194     int implied_monitor_start = -1;
2195     int implied_clone_start = -1;
2196     const char *task = NULL;
2197     const char *status = NULL;
2198     GListPtr gIter = sorted_op_list;
2199 
2200     *stop_index = -1;
2201     *start_index = -1;
2202 
2203     for (; gIter != NULL; gIter = gIter->next) {
2204         xmlNode *rsc_op = (xmlNode *) gIter->data;
2205 
2206         counter++;
2207 
2208         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2209         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2210 
2211         if (pcmk__str_eq(task, CRMD_ACTION_STOP, pcmk__str_casei)
2212             && pcmk__str_eq(status, "0", pcmk__str_casei)) {
2213             *stop_index = counter;
2214 
2215         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_START, CRMD_ACTION_MIGRATED, NULL)) {
2216             *start_index = counter;
2217 
2218         } else if ((implied_monitor_start <= *stop_index) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
2219             const char *rc = crm_element_value(rsc_op, XML_LRM_ATTR_RC);
2220 
2221             if (pcmk__strcase_any_of(rc, "0", "8", NULL)) {
2222                 implied_monitor_start = counter;
2223             }
2224         } else if (pcmk__strcase_any_of(task, CRMD_ACTION_PROMOTE, CRMD_ACTION_DEMOTE, NULL)) {
2225             implied_clone_start = counter;
2226         }
2227     }
2228 
2229     if (*start_index == -1) {
2230         if (implied_clone_start != -1) {
2231             *start_index = implied_clone_start;
2232         } else if (implied_monitor_start != -1) {
2233             *start_index = implied_monitor_start;
2234         }
2235     }
2236 }
2237 
2238 // If resource history entry has shutdown lock, remember lock node and time
2239 static void
2240 unpack_shutdown_lock(xmlNode *rsc_entry, pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2241                      pe_working_set_t *data_set)
2242 {
2243     time_t lock_time = 0;   // When lock started (i.e. node shutdown time)
2244 
2245     if ((crm_element_value_epoch(rsc_entry, XML_CONFIG_ATTR_SHUTDOWN_LOCK,
2246                                  &lock_time) == pcmk_ok) && (lock_time != 0)) {
2247 
2248         if ((data_set->shutdown_lock > 0)
2249             && (get_effective_time(data_set)
2250                 > (lock_time + data_set->shutdown_lock))) {
2251             pe_rsc_info(rsc, "Shutdown lock for %s on %s expired",
2252                         rsc->id, node->details->uname);
2253             pe__clear_resource_history(rsc, node, data_set);
2254         } else {
2255             rsc->lock_node = node;
2256             rsc->lock_time = lock_time;
2257         }
2258     }
2259 }
2260 
2261 static pe_resource_t *
2262 unpack_lrm_rsc_state(pe_node_t * node, xmlNode * rsc_entry, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2263 {
2264     GListPtr gIter = NULL;
2265     int stop_index = -1;
2266     int start_index = -1;
2267     enum rsc_role_e req_role = RSC_ROLE_UNKNOWN;
2268 
2269     const char *task = NULL;
2270     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
2271 
2272     pe_resource_t *rsc = NULL;
2273     GListPtr op_list = NULL;
2274     GListPtr sorted_op_list = NULL;
2275 
2276     xmlNode *migrate_op = NULL;
2277     xmlNode *rsc_op = NULL;
2278     xmlNode *last_failure = NULL;
2279 
2280     enum action_fail_response on_fail = action_fail_ignore;
2281     enum rsc_role_e saved_role = RSC_ROLE_UNKNOWN;
2282 
2283     crm_trace("[%s] Processing %s on %s",
2284               crm_element_name(rsc_entry), rsc_id, node->details->uname);
2285 
2286     /* extract operations */
2287     op_list = NULL;
2288     sorted_op_list = NULL;
2289 
2290     for (rsc_op = pcmk__xe_first_child(rsc_entry); rsc_op != NULL;
2291          rsc_op = pcmk__xe_next(rsc_op)) {
2292 
2293         if (pcmk__str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP,
2294                          pcmk__str_none)) {
2295             op_list = g_list_prepend(op_list, rsc_op);
2296         }
2297     }
2298 
2299     if (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2300         if (op_list == NULL) {
2301             // If there are no operations, there is nothing to do
2302             return NULL;
2303         }
2304     }
2305 
2306     /* find the resource */
2307     rsc = unpack_find_resource(data_set, node, rsc_id, rsc_entry);
2308     if (rsc == NULL) {
2309         if (op_list == NULL) {
2310             // If there are no operations, there is nothing to do
2311             return NULL;
2312         } else {
2313             rsc = process_orphan_resource(rsc_entry, node, data_set);
2314         }
2315     }
2316     CRM_ASSERT(rsc != NULL);
2317 
2318     // Check whether the resource is "shutdown-locked" to this node
2319     if (pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
2320         unpack_shutdown_lock(rsc_entry, rsc, node, data_set);
2321     }
2322 
2323     /* process operations */
2324     saved_role = rsc->role;
2325     rsc->role = RSC_ROLE_UNKNOWN;
2326     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
2327 
2328     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
2329         xmlNode *rsc_op = (xmlNode *) gIter->data;
2330 
2331         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2332         if (pcmk__str_eq(task, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
2333             migrate_op = rsc_op;
2334         }
2335 
2336         unpack_rsc_op(rsc, node, rsc_op, &last_failure, &on_fail, data_set);
2337     }
2338 
2339     /* create active recurring operations as optional */
2340     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
2341     process_recurring(node, rsc, start_index, stop_index, sorted_op_list, data_set);
2342 
2343     /* no need to free the contents */
2344     g_list_free(sorted_op_list);
2345 
2346     process_rsc_state(rsc, node, on_fail, migrate_op, data_set);
2347 
2348     if (get_target_role(rsc, &req_role)) {
2349         if (rsc->next_role == RSC_ROLE_UNKNOWN || req_role < rsc->next_role) {
2350             pe_rsc_debug(rsc, "%s: Overwriting calculated next role %s"
2351                          " with requested next role %s",
2352                          rsc->id, role2text(rsc->next_role), role2text(req_role));
2353             rsc->next_role = req_role;
2354 
2355         } else if (req_role > rsc->next_role) {
2356             pe_rsc_info(rsc, "%s: Not overwriting calculated next role %s"
2357                         " with requested next role %s",
2358                         rsc->id, role2text(rsc->next_role), role2text(req_role));
2359         }
2360     }
2361 
2362     if (saved_role > rsc->role) {
2363         rsc->role = saved_role;
2364     }
2365 
2366     return rsc;
2367 }
2368 
2369 static void
2370 handle_orphaned_container_fillers(xmlNode * lrm_rsc_list, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2371 {
2372     xmlNode *rsc_entry = NULL;
2373     for (rsc_entry = pcmk__xe_first_child(lrm_rsc_list); rsc_entry != NULL;
2374          rsc_entry = pcmk__xe_next(rsc_entry)) {
2375 
2376         pe_resource_t *rsc;
2377         pe_resource_t *container;
2378         const char *rsc_id;
2379         const char *container_id;
2380 
2381         if (!pcmk__str_eq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE, pcmk__str_casei)) {
2382             continue;
2383         }
2384 
2385         container_id = crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER);
2386         rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
2387         if (container_id == NULL || rsc_id == NULL) {
2388             continue;
2389         }
2390 
2391         container = pe_find_resource(data_set->resources, container_id);
2392         if (container == NULL) {
2393             continue;
2394         }
2395 
2396         rsc = pe_find_resource(data_set->resources, rsc_id);
2397         if (rsc == NULL ||
2398             !pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler) ||
2399             rsc->container != NULL) {
2400             continue;
2401         }
2402 
2403         pe_rsc_trace(rsc, "Mapped container of orphaned resource %s to %s",
2404                      rsc->id, container_id);
2405         rsc->container = container;
2406         container->fillers = g_list_append(container->fillers, rsc);
2407     }
2408 }
2409 
2410 static void
2411 unpack_lrm_resources(pe_node_t *node, xmlNode *lrm_rsc_list,
     /* [previous][next][first][last][top][bottom][index][help] */
2412                      pe_working_set_t *data_set)
2413 {
2414     xmlNode *rsc_entry = NULL;
2415     gboolean found_orphaned_container_filler = FALSE;
2416 
2417     for (rsc_entry = pcmk__xe_first_child(lrm_rsc_list); rsc_entry != NULL;
2418          rsc_entry = pcmk__xe_next(rsc_entry)) {
2419 
2420         if (pcmk__str_eq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE, pcmk__str_none)) {
2421             pe_resource_t *rsc = unpack_lrm_rsc_state(node, rsc_entry, data_set);
2422             if (!rsc) {
2423                 continue;
2424             }
2425             if (pcmk_is_set(rsc->flags, pe_rsc_orphan_container_filler)) {
2426                 found_orphaned_container_filler = TRUE;
2427             }
2428         }
2429     }
2430 
2431     /* now that all the resource state has been unpacked for this node
2432      * we have to go back and map any orphaned container fillers to their
2433      * container resource */
2434     if (found_orphaned_container_filler) {
2435         handle_orphaned_container_fillers(lrm_rsc_list, data_set);
2436     }
2437 }
2438 
2439 static void
2440 set_active(pe_resource_t * rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
2441 {
2442     pe_resource_t *top = uber_parent(rsc);
2443 
2444     if (top && pcmk_is_set(top->flags, pe_rsc_promotable)) {
2445         rsc->role = RSC_ROLE_SLAVE;
2446     } else {
2447         rsc->role = RSC_ROLE_STARTED;
2448     }
2449 }
2450 
2451 static void
2452 set_node_score(gpointer key, gpointer value, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
2453 {
2454     pe_node_t *node = value;
2455     int *score = user_data;
2456 
2457     node->weight = *score;
2458 }
2459 
2460 #define STATUS_PATH_MAX 1024
2461 static xmlNode *
2462 find_lrm_op(const char *resource, const char *op, const char *node, const char *source,
     /* [previous][next][first][last][top][bottom][index][help] */
2463             bool success_only, pe_working_set_t *data_set)
2464 {
2465     int offset = 0;
2466     char xpath[STATUS_PATH_MAX];
2467     xmlNode *xml = NULL;
2468 
2469     offset += snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//node_state[@uname='%s']", node);
2470     offset +=
2471         snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//" XML_LRM_TAG_RESOURCE "[@id='%s']",
2472                  resource);
2473 
2474     /* Need to check against transition_magic too? */
2475     if (source && pcmk__str_eq(op, CRMD_ACTION_MIGRATE, pcmk__str_casei)) {
2476         offset +=
2477             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2478                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_target='%s']", op,
2479                      source);
2480     } else if (source && pcmk__str_eq(op, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
2481         offset +=
2482             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2483                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_source='%s']", op,
2484                      source);
2485     } else {
2486         offset +=
2487             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2488                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s']", op);
2489     }
2490 
2491     CRM_LOG_ASSERT(offset > 0);
2492     xml = get_xpath_object(xpath, data_set->input, LOG_DEBUG);
2493 
2494     if (xml && success_only) {
2495         int rc = PCMK_OCF_UNKNOWN_ERROR;
2496         int status = PCMK_LRM_OP_ERROR;
2497 
2498         crm_element_value_int(xml, XML_LRM_ATTR_RC, &rc);
2499         crm_element_value_int(xml, XML_LRM_ATTR_OPSTATUS, &status);
2500         if ((rc != PCMK_OCF_OK) || (status != PCMK_LRM_OP_DONE)) {
2501             return NULL;
2502         }
2503     }
2504     return xml;
2505 }
2506 
2507 static int
2508 pe__call_id(xmlNode *op_xml)
     /* [previous][next][first][last][top][bottom][index][help] */
2509 {
2510     int id = 0;
2511 
2512     if (op_xml) {
2513         crm_element_value_int(op_xml, XML_LRM_ATTR_CALLID, &id);
2514     }
2515     return id;
2516 }
2517 
2518 /*!
2519  * \brief Check whether a stop happened on the same node after some event
2520  *
2521  * \param[in] rsc       Resource being checked
2522  * \param[in] node      Node being checked
2523  * \param[in] xml_op    Event that stop is being compared to
2524  * \param[in] data_set  Cluster working set
2525  *
2526  * \return TRUE if stop happened after event, FALSE otherwise
2527  *
2528  * \note This is really unnecessary, but kept as a safety mechanism. We
2529  *       currently don't save more than one successful event in history, so this
2530  *       only matters when processing really old CIB files that we don't
2531  *       technically support anymore, or as preparation for logging an extended
2532  *       history in the future.
2533  */
2534 static bool
2535 stop_happened_after(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2536                     pe_working_set_t *data_set)
2537 {
2538     xmlNode *stop_op = find_lrm_op(rsc->id, CRMD_ACTION_STOP,
2539                                    node->details->uname, NULL, TRUE, data_set);
2540 
2541     return (stop_op && (pe__call_id(stop_op) > pe__call_id(xml_op)));
2542 }
2543 
2544 static void
2545 unpack_migrate_to_success(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2546                           pe_working_set_t *data_set)
2547 {
2548     /* A successful migration sequence is:
2549      *    migrate_to on source node
2550      *    migrate_from on target node
2551      *    stop on source node
2552      *
2553      * If a migrate_to is followed by a stop, the entire migration (successful
2554      * or failed) is complete, and we don't care what happened on the target.
2555      *
2556      * If no migrate_from has happened, the migration is considered to be
2557      * "partial". If the migrate_from failed, make sure the resource gets
2558      * stopped on both source and target (if up).
2559      *
2560      * If the migrate_to and migrate_from both succeeded (which also implies the
2561      * resource is no longer running on the source), but there is no stop, the
2562      * migration is considered to be "dangling". Schedule a stop on the source
2563      * in this case.
2564      */
2565     int from_rc = 0;
2566     int from_status = 0;
2567     pe_node_t *target_node = NULL;
2568     pe_node_t *source_node = NULL;
2569     xmlNode *migrate_from = NULL;
2570     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2571     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2572 
2573     // Sanity check
2574     CRM_CHECK(source && target && !strcmp(source, node->details->uname), return);
2575 
2576     if (stop_happened_after(rsc, node, xml_op, data_set)) {
2577         return;
2578     }
2579 
2580     // Clones are not allowed to migrate, so role can't be master
2581     rsc->role = RSC_ROLE_STARTED;
2582 
2583     target_node = pe_find_node(data_set->nodes, target);
2584     source_node = pe_find_node(data_set->nodes, source);
2585 
2586     // Check whether there was a migrate_from action on the target
2587     migrate_from = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, target,
2588                                source, FALSE, data_set);
2589     if (migrate_from) {
2590         crm_element_value_int(migrate_from, XML_LRM_ATTR_RC, &from_rc);
2591         crm_element_value_int(migrate_from, XML_LRM_ATTR_OPSTATUS, &from_status);
2592         pe_rsc_trace(rsc, "%s op on %s exited with status=%d, rc=%d",
2593                      ID(migrate_from), target, from_status, from_rc);
2594     }
2595 
2596     if (migrate_from && from_rc == PCMK_OCF_OK
2597         && from_status == PCMK_LRM_OP_DONE) {
2598         /* The migrate_to and migrate_from both succeeded, so mark the migration
2599          * as "dangling". This will be used to schedule a stop action on the
2600          * source without affecting the target.
2601          */
2602         pe_rsc_trace(rsc, "Detected dangling migration op: %s on %s", ID(xml_op),
2603                      source);
2604         rsc->role = RSC_ROLE_STOPPED;
2605         rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
2606 
2607     } else if (migrate_from && (from_status != PCMK_LRM_OP_PENDING)) { // Failed
2608         if (target_node && target_node->details->online) {
2609             pe_rsc_trace(rsc, "Marking active on %s %p %d", target, target_node,
2610                          target_node->details->online);
2611             native_add_running(rsc, target_node, data_set);
2612         }
2613 
2614     } else { // Pending, or complete but erased
2615         if (target_node && target_node->details->online) {
2616             pe_rsc_trace(rsc, "Marking active on %s %p %d", target, target_node,
2617                          target_node->details->online);
2618 
2619             native_add_running(rsc, target_node, data_set);
2620             if (source_node && source_node->details->online) {
2621                 /* This is a partial migration: the migrate_to completed
2622                  * successfully on the source, but the migrate_from has not
2623                  * completed. Remember the source and target; if the newly
2624                  * chosen target remains the same when we schedule actions
2625                  * later, we may continue with the migration.
2626                  */
2627                 rsc->partial_migration_target = target_node;
2628                 rsc->partial_migration_source = source_node;
2629             }
2630         } else {
2631             /* Consider it failed here - forces a restart, prevents migration */
2632             pe__set_resource_flags(rsc, pe_rsc_failed|pe_rsc_stop);
2633             pe__clear_resource_flags(rsc, pe_rsc_allow_migrate);
2634         }
2635     }
2636 }
2637 
2638 // Is there an action_name in node_name's rsc history newer than call_id?
2639 static bool
2640 newer_op(pe_resource_t *rsc, const char *action_name, const char *node_name,
     /* [previous][next][first][last][top][bottom][index][help] */
2641          int call_id, pe_working_set_t *data_set)
2642 {
2643     xmlNode *action = find_lrm_op(rsc->id, action_name, node_name, NULL, TRUE,
2644                                   data_set);
2645 
2646     return pe__call_id(action) > call_id;
2647 }
2648 
2649 static void
2650 unpack_migrate_to_failure(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
2651                           pe_working_set_t *data_set)
2652 {
2653     int target_stop_id = 0;
2654     int target_migrate_from_id = 0;
2655     xmlNode *target_stop = NULL;
2656     xmlNode *target_migrate_from = NULL;
2657     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2658     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2659 
2660     // Sanity check
2661     CRM_CHECK(source && target && !strcmp(source, node->details->uname), return);
2662 
2663     /* If a migration failed, we have to assume the resource is active. Clones
2664      * are not allowed to migrate, so role can't be master.
2665      */
2666     rsc->role = RSC_ROLE_STARTED;
2667 
2668     // Check for stop on the target
2669     target_stop = find_lrm_op(rsc->id, CRMD_ACTION_STOP, target, NULL,
2670                               TRUE, data_set);
2671     target_stop_id = pe__call_id(target_stop);
2672 
2673     // Check for migrate_from on the target
2674     target_migrate_from = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, target,
2675                                       source, TRUE, data_set);
2676     target_migrate_from_id = pe__call_id(target_migrate_from);
2677 
2678     if ((target_stop == NULL) || (target_stop_id < target_migrate_from_id)) {
2679         /* There was no stop on the target, or a stop that happened before a
2680          * migrate_from, so assume the resource is still active on the target
2681          * (if it is up).
2682          */
2683         pe_node_t *target_node = pe_find_node(data_set->nodes, target);
2684 
2685         pe_rsc_trace(rsc, "stop (%d) + migrate_from (%d)",
2686                      target_stop_id, target_migrate_from_id);
2687         if (target_node && target_node->details->online) {
2688             native_add_running(rsc, target_node, data_set);
2689         }
2690 
2691     } else if (target_migrate_from == NULL) {
2692         /* We know there was a stop on the target, but there may not have been a
2693          * migrate_from (the stop could have happened before migrate_from was
2694          * scheduled or attempted).
2695          *
2696          * That means this could be a "dangling" migration. But first, check
2697          * whether there is a newer successful stop, start, or migrate_from on
2698          * the source node -- it's possible the failed migration was followed by
2699          * a successful stop, full restart, or migration in the reverse
2700          * direction, in which case we don't want to force a stop.
2701          */
2702         int source_migrate_to_id = pe__call_id(xml_op);
2703 
2704         if (newer_op(rsc, CRMD_ACTION_MIGRATED, source, source_migrate_to_id,
2705                      data_set)
2706             || newer_op(rsc, CRMD_ACTION_START, source, source_migrate_to_id,
2707                      data_set)
2708             || newer_op(rsc, CRMD_ACTION_STOP, source, source_migrate_to_id,
2709                      data_set)) {
2710             return;
2711         }
2712 
2713         // Mark node as having dangling migration so we can force a stop later
2714         rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
2715     }
2716 }
2717 
2718 static void
2719 unpack_migrate_from_failure(pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2720                             xmlNode *xml_op, pe_working_set_t *data_set)
2721 {
2722     xmlNode *source_stop = NULL;
2723     xmlNode *source_migrate_to = NULL;
2724     const char *source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2725     const char *target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2726 
2727     // Sanity check
2728     CRM_CHECK(source && target && !strcmp(target, node->details->uname), return);
2729 
2730     /* If a migration failed, we have to assume the resource is active. Clones
2731      * are not allowed to migrate, so role can't be master.
2732      */
2733     rsc->role = RSC_ROLE_STARTED;
2734 
2735     // Check for a stop on the source
2736     source_stop = find_lrm_op(rsc->id, CRMD_ACTION_STOP, source, NULL,
2737                               TRUE, data_set);
2738 
2739     // Check for a migrate_to on the source
2740     source_migrate_to = find_lrm_op(rsc->id, CRMD_ACTION_MIGRATE,
2741                                     source, target, TRUE, data_set);
2742 
2743     if ((source_stop == NULL)
2744         || (pe__call_id(source_stop) < pe__call_id(source_migrate_to))) {
2745         /* There was no stop on the source, or a stop that happened before
2746          * migrate_to, so assume the resource is still active on the source (if
2747          * it is up).
2748          */
2749         pe_node_t *source_node = pe_find_node(data_set->nodes, source);
2750 
2751         if (source_node && source_node->details->online) {
2752             native_add_running(rsc, source_node, data_set);
2753         }
2754     }
2755 }
2756 
2757 static void
2758 record_failed_op(xmlNode *op, const pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
2759                  const pe_resource_t *rsc, pe_working_set_t *data_set)
2760 {
2761     xmlNode *xIter = NULL;
2762     const char *op_key = crm_element_value(op, XML_LRM_ATTR_TASK_KEY);
2763 
2764     if (node->details->online == FALSE) {
2765         return;
2766     }
2767 
2768     for (xIter = data_set->failed->children; xIter; xIter = xIter->next) {
2769         const char *key = crm_element_value(xIter, XML_LRM_ATTR_TASK_KEY);
2770         const char *uname = crm_element_value(xIter, XML_ATTR_UNAME);
2771 
2772         if(pcmk__str_eq(op_key, key, pcmk__str_casei) && pcmk__str_eq(uname, node->details->uname, pcmk__str_casei)) {
2773             crm_trace("Skipping duplicate entry %s on %s", op_key, node->details->uname);
2774             return;
2775         }
2776     }
2777 
2778     crm_trace("Adding entry %s on %s", op_key, node->details->uname);
2779     crm_xml_add(op, XML_ATTR_UNAME, node->details->uname);
2780     crm_xml_add(op, XML_LRM_ATTR_RSCID, rsc->id);
2781     add_node_copy(data_set->failed, op);
2782 }
2783 
2784 static const char *get_op_key(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
2785 {
2786     const char *key = crm_element_value(xml_op, XML_LRM_ATTR_TASK_KEY);
2787     if(key == NULL) {
2788         key = ID(xml_op);
2789     }
2790     return key;
2791 }
2792 
2793 static const char *
2794 last_change_str(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
2795 {
2796     time_t when;
2797     const char *when_s = NULL;
2798 
2799     if (crm_element_value_epoch(xml_op, XML_RSC_OP_LAST_CHANGE,
2800                                 &when) == pcmk_ok) {
2801         when_s = pcmk__epoch2str(&when);
2802         if (when_s) {
2803             // Skip day of week to make message shorter
2804             when_s = strchr(when_s, ' ');
2805             if (when_s) {
2806                 ++when_s;
2807             }
2808         }
2809     }
2810     return ((when_s && *when_s)? when_s : "unknown time");
2811 }
2812 
2813 /*!
2814  * \internal
2815  * \brief Compare two on-fail values
2816  *
2817  * \param[in] first   One on-fail value to compare
2818  * \param[in] second  The other on-fail value to compare
2819  *
2820  * \return A negative number if second is more severe than first, zero if they
2821  *         are equal, or a positive number if first is more severe than second.
2822  * \note This is only needed until the action_fail_response values can be
2823  *       renumbered at the next API compatibility break.
2824  */
2825 static int
2826 cmp_on_fail(enum action_fail_response first, enum action_fail_response second)
     /* [previous][next][first][last][top][bottom][index][help] */
2827 {
2828     switch (first) {
2829         case action_fail_demote:
2830             switch (second) {
2831                 case action_fail_ignore:
2832                     return 1;
2833                 case action_fail_demote:
2834                     return 0;
2835                 default:
2836                     return -1;
2837             }
2838             break;
2839 
2840         case action_fail_reset_remote:
2841             switch (second) {
2842                 case action_fail_ignore:
2843                 case action_fail_demote:
2844                 case action_fail_recover:
2845                     return 1;
2846                 case action_fail_reset_remote:
2847                     return 0;
2848                 default:
2849                     return -1;
2850             }
2851             break;
2852 
2853         case action_fail_restart_container:
2854             switch (second) {
2855                 case action_fail_ignore:
2856                 case action_fail_demote:
2857                 case action_fail_recover:
2858                 case action_fail_reset_remote:
2859                     return 1;
2860                 case action_fail_restart_container:
2861                     return 0;
2862                 default:
2863                     return -1;
2864             }
2865             break;
2866 
2867         default:
2868             break;
2869     }
2870     switch (second) {
2871         case action_fail_demote:
2872             return (first == action_fail_ignore)? -1 : 1;
2873 
2874         case action_fail_reset_remote:
2875             switch (first) {
2876                 case action_fail_ignore:
2877                 case action_fail_demote:
2878                 case action_fail_recover:
2879                     return -1;
2880                 default:
2881                     return 1;
2882             }
2883             break;
2884 
2885         case action_fail_restart_container:
2886             switch (first) {
2887                 case action_fail_ignore:
2888                 case action_fail_demote:
2889                 case action_fail_recover:
2890                 case action_fail_reset_remote:
2891                     return -1;
2892                 default:
2893                     return 1;
2894             }
2895             break;
2896 
2897         default:
2898             break;
2899     }
2900     return first - second;
2901 }
2902 
2903 static void
2904 unpack_rsc_op_failure(pe_resource_t * rsc, pe_node_t * node, int rc, xmlNode * xml_op, xmlNode ** last_failure,
     /* [previous][next][first][last][top][bottom][index][help] */
2905                       enum action_fail_response * on_fail, pe_working_set_t * data_set)
2906 {
2907     guint interval_ms = 0;
2908     bool is_probe = false;
2909     pe_action_t *action = NULL;
2910 
2911     const char *key = get_op_key(xml_op);
2912     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
2913     const char *exit_reason = crm_element_value(xml_op,
2914                                                 XML_LRM_ATTR_EXIT_REASON);
2915 
2916     CRM_ASSERT(rsc);
2917     CRM_CHECK(task != NULL, return);
2918 
2919     *last_failure = xml_op;
2920 
2921     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
2922     if ((interval_ms == 0) && !strcmp(task, CRMD_ACTION_STATUS)) {
2923         is_probe = true;
2924     }
2925 
2926     if (exit_reason == NULL) {
2927         exit_reason = "";
2928     }
2929 
2930     if (!pcmk_is_set(data_set->flags, pe_flag_symmetric_cluster)
2931         && (rc == PCMK_OCF_NOT_INSTALLED)) {
2932         crm_trace("Unexpected result (%s%s%s) was recorded for "
2933                   "%s of %s on %s at %s " CRM_XS " rc=%d id=%s",
2934                   services_ocf_exitcode_str(rc),
2935                   (*exit_reason? ": " : ""), exit_reason,
2936                   (is_probe? "probe" : task), rsc->id, node->details->uname,
2937                   last_change_str(xml_op), rc, ID(xml_op));
2938     } else {
2939         crm_warn("Unexpected result (%s%s%s) was recorded for "
2940                   "%s of %s on %s at %s " CRM_XS " rc=%d id=%s",
2941                  services_ocf_exitcode_str(rc),
2942                  (*exit_reason? ": " : ""), exit_reason,
2943                  (is_probe? "probe" : task), rsc->id, node->details->uname,
2944                  last_change_str(xml_op), rc, ID(xml_op));
2945 
2946         if (is_probe && (rc != PCMK_OCF_OK)
2947             && (rc != PCMK_OCF_NOT_RUNNING)
2948             && (rc != PCMK_OCF_RUNNING_MASTER)) {
2949 
2950             /* A failed (not just unexpected) probe result could mean the user
2951              * didn't know resources will be probed even where they can't run.
2952              */
2953             crm_notice("If it is not possible for %s to run on %s, see "
2954                        "the resource-discovery option for location constraints",
2955                        rsc->id, node->details->uname);
2956         }
2957 
2958         record_failed_op(xml_op, node, rsc, data_set);
2959     }
2960 
2961     action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
2962     if (cmp_on_fail(*on_fail, action->on_fail) < 0) {
2963         pe_rsc_trace(rsc, "on-fail %s -> %s for %s (%s)", fail2text(*on_fail),
2964                      fail2text(action->on_fail), action->uuid, key);
2965         *on_fail = action->on_fail;
2966     }
2967 
2968     if (!strcmp(task, CRMD_ACTION_STOP)) {
2969         resource_location(rsc, node, -INFINITY, "__stop_fail__", data_set);
2970 
2971     } else if (!strcmp(task, CRMD_ACTION_MIGRATE)) {
2972         unpack_migrate_to_failure(rsc, node, xml_op, data_set);
2973 
2974     } else if (!strcmp(task, CRMD_ACTION_MIGRATED)) {
2975         unpack_migrate_from_failure(rsc, node, xml_op, data_set);
2976 
2977     } else if (!strcmp(task, CRMD_ACTION_PROMOTE)) {
2978         rsc->role = RSC_ROLE_MASTER;
2979 
2980     } else if (!strcmp(task, CRMD_ACTION_DEMOTE)) {
2981         if (action->on_fail == action_fail_block) {
2982             rsc->role = RSC_ROLE_MASTER;
2983             rsc->next_role = RSC_ROLE_STOPPED;
2984 
2985         } else if(rc == PCMK_OCF_NOT_RUNNING) {
2986             rsc->role = RSC_ROLE_STOPPED;
2987 
2988         } else {
2989             /* Staying in master role would put the scheduler and controller
2990              * into a loop. Setting slave role is not dangerous because the
2991              * resource will be stopped as part of recovery, and any master
2992              * promotion will be ordered after that stop.
2993              */
2994             rsc->role = RSC_ROLE_SLAVE;
2995         }
2996     }
2997 
2998     if(is_probe && rc == PCMK_OCF_NOT_INSTALLED) {
2999         /* leave stopped */
3000         pe_rsc_trace(rsc, "Leaving %s stopped", rsc->id);
3001         rsc->role = RSC_ROLE_STOPPED;
3002 
3003     } else if (rsc->role < RSC_ROLE_STARTED) {
3004         pe_rsc_trace(rsc, "Setting %s active", rsc->id);
3005         set_active(rsc);
3006     }
3007 
3008     pe_rsc_trace(rsc, "Resource %s: role=%s, unclean=%s, on_fail=%s, fail_role=%s",
3009                  rsc->id, role2text(rsc->role),
3010                  pcmk__btoa(node->details->unclean),
3011                  fail2text(action->on_fail), role2text(action->fail_role));
3012 
3013     if (action->fail_role != RSC_ROLE_STARTED && rsc->next_role < action->fail_role) {
3014         rsc->next_role = action->fail_role;
3015     }
3016 
3017     if (action->fail_role == RSC_ROLE_STOPPED) {
3018         int score = -INFINITY;
3019 
3020         pe_resource_t *fail_rsc = rsc;
3021 
3022         if (fail_rsc->parent) {
3023             pe_resource_t *parent = uber_parent(fail_rsc);
3024 
3025             if (pe_rsc_is_clone(parent)
3026                 && !pcmk_is_set(parent->flags, pe_rsc_unique)) {
3027                 /* For clone resources, if a child fails on an operation
3028                  * with on-fail = stop, all the resources fail.  Do this by preventing
3029                  * the parent from coming up again. */
3030                 fail_rsc = parent;
3031             }
3032         }
3033         crm_notice("%s will not be started under current conditions",
3034                    fail_rsc->id);
3035         /* make sure it doesn't come up again */
3036         if (fail_rsc->allowed_nodes != NULL) {
3037             g_hash_table_destroy(fail_rsc->allowed_nodes);
3038         }
3039         fail_rsc->allowed_nodes = pe__node_list2table(data_set->nodes);
3040         g_hash_table_foreach(fail_rsc->allowed_nodes, set_node_score, &score);
3041     }
3042 
3043     pe_free_action(action);
3044 }
3045 
3046 /*!
3047  * \internal
3048  * \brief Remap operation status based on action result
3049  *
3050  * Given an action result, determine an appropriate operation status for the
3051  * purposes of responding to the action (the status provided by the executor is
3052  * not directly usable since the executor does not know what was expected).
3053  *
3054  * \param[in,out] rsc        Resource that operation history entry is for
3055  * \param[in]     rc         Actual return code of operation
3056  * \param[in]     target_rc  Expected return code of operation
3057  * \param[in]     node       Node where operation was executed
3058  * \param[in]     xml_op     Operation history entry XML from CIB status
3059  * \param[in,out] on_fail    What should be done about the result
3060  * \param[in]     data_set   Current cluster working set
3061  *
3062  * \return Operation status based on return code and action info
3063  * \note This may update the resource's current and next role.
3064  */
3065 static int
3066 determine_op_status(
     /* [previous][next][first][last][top][bottom][index][help] */
3067     pe_resource_t *rsc, int rc, int target_rc, pe_node_t * node, xmlNode * xml_op, enum action_fail_response * on_fail, pe_working_set_t * data_set) 
3068 {
3069     guint interval_ms = 0;
3070     bool is_probe = false;
3071     int result = PCMK_LRM_OP_DONE;
3072     const char *key = get_op_key(xml_op);
3073     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3074     const char *exit_reason = crm_element_value(xml_op,
3075                                                 XML_LRM_ATTR_EXIT_REASON);
3076 
3077     CRM_ASSERT(rsc);
3078     CRM_CHECK(task != NULL, return PCMK_LRM_OP_ERROR);
3079 
3080     if (exit_reason == NULL) {
3081         exit_reason = "";
3082     }
3083 
3084     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3085     if ((interval_ms == 0) && !strcmp(task, CRMD_ACTION_STATUS)) {
3086         is_probe = true;
3087         task = "probe";
3088     }
3089 
3090     if (target_rc < 0) {
3091         /* Pre-1.0 Pacemaker versions, and Pacemaker 1.1.6 or earlier with
3092          * Heartbeat 2.0.7 or earlier as the cluster layer, did not include the
3093          * target_rc in the transition key, which (along with the similar case
3094          * of a corrupted transition key in the CIB) will be reported to this
3095          * function as -1. Pacemaker 2.0+ does not support rolling upgrades from
3096          * those versions or processing of saved CIB files from those versions,
3097          * so we do not need to care much about this case.
3098          */
3099         result = PCMK_LRM_OP_ERROR;
3100         crm_warn("Expected result not found for %s on %s (corrupt or obsolete CIB?)",
3101                  key, node->details->uname);
3102 
3103     } else if (target_rc != rc) {
3104         result = PCMK_LRM_OP_ERROR;
3105         pe_rsc_debug(rsc, "%s on %s: expected %d (%s), got %d (%s%s%s)",
3106                      key, node->details->uname,
3107                      target_rc, services_ocf_exitcode_str(target_rc),
3108                      rc, services_ocf_exitcode_str(rc),
3109                      (*exit_reason? ": " : ""), exit_reason);
3110     }
3111 
3112     switch (rc) {
3113         case PCMK_OCF_OK:
3114             if (is_probe && (target_rc == PCMK_OCF_NOT_RUNNING)) {
3115                 result = PCMK_LRM_OP_DONE;
3116                 pe_rsc_info(rsc, "Probe found %s active on %s at %s",
3117                             rsc->id, node->details->uname,
3118                             last_change_str(xml_op));
3119             }
3120             break;
3121 
3122         case PCMK_OCF_NOT_RUNNING:
3123             if (is_probe || (target_rc == rc)
3124                 || !pcmk_is_set(rsc->flags, pe_rsc_managed)) {
3125 
3126                 result = PCMK_LRM_OP_DONE;
3127                 rsc->role = RSC_ROLE_STOPPED;
3128 
3129                 /* clear any previous failure actions */
3130                 *on_fail = action_fail_ignore;
3131                 rsc->next_role = RSC_ROLE_UNKNOWN;
3132             }
3133             break;
3134 
3135         case PCMK_OCF_RUNNING_MASTER:
3136             if (is_probe && (rc != target_rc)) {
3137                 result = PCMK_LRM_OP_DONE;
3138                 pe_rsc_info(rsc,
3139                             "Probe found %s active and promoted on %s at %s",
3140                             rsc->id, node->details->uname,
3141                             last_change_str(xml_op));
3142             }
3143             rsc->role = RSC_ROLE_MASTER;
3144             break;
3145 
3146         case PCMK_OCF_DEGRADED_MASTER:
3147         case PCMK_OCF_FAILED_MASTER:
3148             rsc->role = RSC_ROLE_MASTER;
3149             result = PCMK_LRM_OP_ERROR;
3150             break;
3151 
3152         case PCMK_OCF_NOT_CONFIGURED:
3153             result = PCMK_LRM_OP_ERROR_FATAL;
3154             break;
3155 
3156         case PCMK_OCF_UNIMPLEMENT_FEATURE:
3157             if (interval_ms > 0) {
3158                 result = PCMK_LRM_OP_NOTSUPPORTED;
3159                 break;
3160             }
3161             // fall through
3162         case PCMK_OCF_NOT_INSTALLED:
3163         case PCMK_OCF_INVALID_PARAM:
3164         case PCMK_OCF_INSUFFICIENT_PRIV:
3165             if (!pe_can_fence(data_set, node)
3166                 && !strcmp(task, CRMD_ACTION_STOP)) {
3167                 /* If a stop fails and we can't fence, there's nothing else we can do */
3168                 pe_proc_err("No further recovery can be attempted for %s "
3169                             "because %s on %s failed (%s%s%s) at %s "
3170                             CRM_XS " rc=%d id=%s", rsc->id, task,
3171                             node->details->uname, services_ocf_exitcode_str(rc),
3172                             (*exit_reason? ": " : ""), exit_reason,
3173                             last_change_str(xml_op), rc, ID(xml_op));
3174                 pe__clear_resource_flags(rsc, pe_rsc_managed);
3175                 pe__set_resource_flags(rsc, pe_rsc_block);
3176             }
3177             result = PCMK_LRM_OP_ERROR_HARD;
3178             break;
3179 
3180         default:
3181             if (result == PCMK_LRM_OP_DONE) {
3182                 crm_info("Treating unknown exit status %d from %s of %s "
3183                          "on %s at %s as failure",
3184                          rc, task, rsc->id, node->details->uname,
3185                          last_change_str(xml_op));
3186                 result = PCMK_LRM_OP_ERROR;
3187             }
3188             break;
3189     }
3190     return result;
3191 }
3192 
3193 // return TRUE if start or monitor last failure but parameters changed
3194 static bool
3195 should_clear_for_param_change(xmlNode *xml_op, const char *task,
     /* [previous][next][first][last][top][bottom][index][help] */
3196                               pe_resource_t *rsc, pe_node_t *node,
3197                               pe_working_set_t *data_set)
3198 {
3199     if (!strcmp(task, "start") || !strcmp(task, "monitor")) {
3200 
3201         if (pe__bundle_needs_remote_name(rsc)) {
3202             /* We haven't allocated resources yet, so we can't reliably
3203              * substitute addr parameters for the REMOTE_CONTAINER_HACK.
3204              * When that's needed, defer the check until later.
3205              */
3206             pe__add_param_check(xml_op, rsc, node, pe_check_last_failure,
3207                                 data_set);
3208 
3209         } else {
3210             op_digest_cache_t *digest_data = NULL;
3211 
3212             digest_data = rsc_action_digest_cmp(rsc, xml_op, node, data_set);
3213             switch (digest_data->rc) {
3214                 case RSC_DIGEST_UNKNOWN:
3215                     crm_trace("Resource %s history entry %s on %s"
3216                               " has no digest to compare",
3217                               rsc->id, get_op_key(xml_op), node->details->id);
3218                     break;
3219                 case RSC_DIGEST_MATCH:
3220                     break;
3221                 default:
3222                     return TRUE;
3223             }
3224         }
3225     }
3226     return FALSE;
3227 }
3228 
3229 // Order action after fencing of remote node, given connection rsc
3230 static void
3231 order_after_remote_fencing(pe_action_t *action, pe_resource_t *remote_conn,
     /* [previous][next][first][last][top][bottom][index][help] */
3232                            pe_working_set_t *data_set)
3233 {
3234     pe_node_t *remote_node = pe_find_node(data_set->nodes, remote_conn->id);
3235 
3236     if (remote_node) {
3237         pe_action_t *fence = pe_fence_op(remote_node, NULL, TRUE, NULL,
3238                                          FALSE, data_set);
3239 
3240         order_actions(fence, action, pe_order_implies_then);
3241     }
3242 }
3243 
3244 static bool
3245 should_ignore_failure_timeout(pe_resource_t *rsc, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
3246                               const char *task, guint interval_ms,
3247                               bool is_last_failure, pe_working_set_t *data_set)
3248 {
3249     /* Clearing failures of recurring monitors has special concerns. The
3250      * executor reports only changes in the monitor result, so if the
3251      * monitor is still active and still getting the same failure result,
3252      * that will go undetected after the failure is cleared.
3253      *
3254      * Also, the operation history will have the time when the recurring
3255      * monitor result changed to the given code, not the time when the
3256      * result last happened.
3257      *
3258      * @TODO We probably should clear such failures only when the failure
3259      * timeout has passed since the last occurrence of the failed result.
3260      * However we don't record that information. We could maybe approximate
3261      * that by clearing only if there is a more recent successful monitor or
3262      * stop result, but we don't even have that information at this point
3263      * since we are still unpacking the resource's operation history.
3264      *
3265      * This is especially important for remote connection resources with a
3266      * reconnect interval, so in that case, we skip clearing failures
3267      * if the remote node hasn't been fenced.
3268      */
3269     if (rsc->remote_reconnect_ms
3270         && pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
3271         && (interval_ms != 0) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3272 
3273         pe_node_t *remote_node = pe_find_node(data_set->nodes, rsc->id);
3274 
3275         if (remote_node && !remote_node->details->remote_was_fenced) {
3276             if (is_last_failure) {
3277                 crm_info("Waiting to clear monitor failure for remote node %s"
3278                          " until fencing has occurred", rsc->id);
3279             }
3280             return TRUE;
3281         }
3282     }
3283     return FALSE;
3284 }
3285 
3286 /*!
3287  * \internal
3288  * \brief Check operation age and schedule failure clearing when appropriate
3289  *
3290  * This function has two distinct purposes. The first is to check whether an
3291  * operation history entry is expired (i.e. the resource has a failure timeout,
3292  * the entry is older than the timeout, and the resource either has no fail
3293  * count or its fail count is entirely older than the timeout). The second is to
3294  * schedule fail count clearing when appropriate (i.e. the operation is expired
3295  * and either the resource has an expired fail count or the operation is a
3296  * last_failure for a remote connection resource with a reconnect interval,
3297  * or the operation is a last_failure for a start or monitor operation and the
3298  * resource's parameters have changed since the operation).
3299  *
3300  * \param[in] rsc       Resource that operation happened to
3301  * \param[in] node      Node that operation happened on
3302  * \param[in] rc        Actual result of operation
3303  * \param[in] xml_op    Operation history entry XML
3304  * \param[in] data_set  Current working set
3305  *
3306  * \return TRUE if operation history entry is expired, FALSE otherwise
3307  */
3308 static bool
3309 check_operation_expiry(pe_resource_t *rsc, pe_node_t *node, int rc,
     /* [previous][next][first][last][top][bottom][index][help] */
3310                        xmlNode *xml_op, pe_working_set_t *data_set)
3311 {
3312     bool expired = FALSE;
3313     bool is_last_failure = pcmk__ends_with(ID(xml_op), "_last_failure_0");
3314     time_t last_run = 0;
3315     guint interval_ms = 0;
3316     int unexpired_fail_count = 0;
3317     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3318     const char *clear_reason = NULL;
3319 
3320     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3321 
3322     if ((rsc->failure_timeout > 0)
3323         && (crm_element_value_epoch(xml_op, XML_RSC_OP_LAST_CHANGE,
3324                                     &last_run) == 0)) {
3325 
3326         // Resource has a failure-timeout, and history entry has a timestamp
3327 
3328         time_t now = get_effective_time(data_set);
3329         time_t last_failure = 0;
3330 
3331         // Is this particular operation history older than the failure timeout?
3332         if ((now >= (last_run + rsc->failure_timeout))
3333             && !should_ignore_failure_timeout(rsc, xml_op, task, interval_ms,
3334                                               is_last_failure, data_set)) {
3335             expired = TRUE;
3336         }
3337 
3338         // Does the resource as a whole have an unexpired fail count?
3339         unexpired_fail_count = pe_get_failcount(node, rsc, &last_failure,
3340                                                 pe_fc_effective, xml_op,
3341                                                 data_set);
3342 
3343         // Update scheduler recheck time according to *last* failure
3344         crm_trace("%s@%lld is %sexpired @%lld with unexpired_failures=%d timeout=%ds"
3345                   " last-failure@%lld",
3346                   ID(xml_op), (long long) last_run, (expired? "" : "not "),
3347                   (long long) now, unexpired_fail_count, rsc->failure_timeout,
3348                   (long long) last_failure);
3349         last_failure += rsc->failure_timeout + 1;
3350         if (unexpired_fail_count && (now < last_failure)) {
3351             pe__update_recheck_time(last_failure, data_set);
3352         }
3353     }
3354 
3355     if (expired) {
3356         if (pe_get_failcount(node, rsc, NULL, pe_fc_default, xml_op, data_set)) {
3357 
3358             // There is a fail count ignoring timeout
3359 
3360             if (unexpired_fail_count == 0) {
3361                 // There is no fail count considering timeout
3362                 clear_reason = "it expired";
3363 
3364             } else {
3365                 /* This operation is old, but there is an unexpired fail count.
3366                  * In a properly functioning cluster, this should only be
3367                  * possible if this operation is not a failure (otherwise the
3368                  * fail count should be expired too), so this is really just a
3369                  * failsafe.
3370                  */
3371                 expired = FALSE;
3372             }
3373 
3374         } else if (is_last_failure && rsc->remote_reconnect_ms) {
3375             /* Clear any expired last failure when reconnect interval is set,
3376              * even if there is no fail count.
3377              */
3378             clear_reason = "reconnect interval is set";
3379         }
3380     }
3381 
3382     if (!expired && is_last_failure
3383         && should_clear_for_param_change(xml_op, task, rsc, node, data_set)) {
3384         clear_reason = "resource parameters have changed";
3385     }
3386 
3387     if (clear_reason != NULL) {
3388         // Schedule clearing of the fail count
3389         pe_action_t *clear_op = pe__clear_failcount(rsc, node, clear_reason,
3390                                                     data_set);
3391 
3392         if (pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)
3393             && rsc->remote_reconnect_ms) {
3394             /* If we're clearing a remote connection due to a reconnect
3395              * interval, we want to wait until any scheduled fencing
3396              * completes.
3397              *
3398              * We could limit this to remote_node->details->unclean, but at
3399              * this point, that's always true (it won't be reliable until
3400              * after unpack_node_loop() is done).
3401              */
3402             crm_info("Clearing %s failure will wait until any scheduled "
3403                      "fencing of %s completes", task, rsc->id);
3404             order_after_remote_fencing(clear_op, rsc, data_set);
3405         }
3406     }
3407 
3408     if (expired && (interval_ms == 0) && pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3409         switch(rc) {
3410             case PCMK_OCF_OK:
3411             case PCMK_OCF_NOT_RUNNING:
3412             case PCMK_OCF_RUNNING_MASTER:
3413             case PCMK_OCF_DEGRADED:
3414             case PCMK_OCF_DEGRADED_MASTER:
3415                 // Don't expire probes that return these values
3416                 expired = FALSE;
3417                 break;
3418         }
3419     }
3420 
3421     return expired;
3422 }
3423 
3424 int pe__target_rc_from_xml(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
3425 {
3426     int target_rc = 0;
3427     const char *key = crm_element_value(xml_op, XML_ATTR_TRANSITION_KEY);
3428 
3429     if (key == NULL) {
3430         return -1;
3431     }
3432     decode_transition_key(key, NULL, NULL, NULL, &target_rc);
3433     return target_rc;
3434 }
3435 
3436 static enum action_fail_response
3437 get_action_on_fail(pe_resource_t *rsc, const char *key, const char *task, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
3438 {
3439     enum action_fail_response result = action_fail_recover;
3440     pe_action_t *action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
3441 
3442     result = action->on_fail;
3443     pe_free_action(action);
3444 
3445     return result;
3446 }
3447 
3448 static void
3449 update_resource_state(pe_resource_t * rsc, pe_node_t * node, xmlNode * xml_op, const char * task, int rc,
     /* [previous][next][first][last][top][bottom][index][help] */
3450                       xmlNode * last_failure, enum action_fail_response * on_fail, pe_working_set_t * data_set)
3451 {
3452     gboolean clear_past_failure = FALSE;
3453 
3454     CRM_ASSERT(rsc);
3455     CRM_ASSERT(xml_op);
3456 
3457     if (rc == PCMK_OCF_NOT_RUNNING) {
3458         clear_past_failure = TRUE;
3459 
3460     } else if (rc == PCMK_OCF_NOT_INSTALLED) {
3461         rsc->role = RSC_ROLE_STOPPED;
3462 
3463     } else if (pcmk__str_eq(task, CRMD_ACTION_STATUS, pcmk__str_casei)) {
3464         if (last_failure) {
3465             const char *op_key = get_op_key(xml_op);
3466             const char *last_failure_key = get_op_key(last_failure);
3467 
3468             if (pcmk__str_eq(op_key, last_failure_key, pcmk__str_casei)) {
3469                 clear_past_failure = TRUE;
3470             }
3471         }
3472 
3473         if (rsc->role < RSC_ROLE_STARTED) {
3474             set_active(rsc);
3475         }
3476 
3477     } else if (pcmk__str_eq(task, CRMD_ACTION_START, pcmk__str_casei)) {
3478         rsc->role = RSC_ROLE_STARTED;
3479         clear_past_failure = TRUE;
3480 
3481     } else if (pcmk__str_eq(task, CRMD_ACTION_STOP, pcmk__str_casei)) {
3482         rsc->role = RSC_ROLE_STOPPED;
3483         clear_past_failure = TRUE;
3484 
3485     } else if (pcmk__str_eq(task, CRMD_ACTION_PROMOTE, pcmk__str_casei)) {
3486         rsc->role = RSC_ROLE_MASTER;
3487         clear_past_failure = TRUE;
3488 
3489     } else if (pcmk__str_eq(task, CRMD_ACTION_DEMOTE, pcmk__str_casei)) {
3490 
3491         if (*on_fail == action_fail_demote) {
3492             // Demote clears an error only if on-fail=demote
3493             clear_past_failure = TRUE;
3494         }
3495         rsc->role = RSC_ROLE_SLAVE;
3496 
3497     } else if (pcmk__str_eq(task, CRMD_ACTION_MIGRATED, pcmk__str_casei)) {
3498         rsc->role = RSC_ROLE_STARTED;
3499         clear_past_failure = TRUE;
3500 
3501     } else if (pcmk__str_eq(task, CRMD_ACTION_MIGRATE, pcmk__str_casei)) {
3502         unpack_migrate_to_success(rsc, node, xml_op, data_set);
3503 
3504     } else if (rsc->role < RSC_ROLE_STARTED) {
3505         pe_rsc_trace(rsc, "%s active on %s", rsc->id, node->details->uname);
3506         set_active(rsc);
3507     }
3508 
3509     /* clear any previous failure actions */
3510     if (clear_past_failure) {
3511         switch (*on_fail) {
3512             case action_fail_stop:
3513             case action_fail_fence:
3514             case action_fail_migrate:
3515             case action_fail_standby:
3516                 pe_rsc_trace(rsc, "%s.%s is not cleared by a completed stop",
3517                              rsc->id, fail2text(*on_fail));
3518                 break;
3519 
3520             case action_fail_block:
3521             case action_fail_ignore:
3522             case action_fail_demote:
3523             case action_fail_recover:
3524             case action_fail_restart_container:
3525                 *on_fail = action_fail_ignore;
3526                 rsc->next_role = RSC_ROLE_UNKNOWN;
3527                 break;
3528             case action_fail_reset_remote:
3529                 if (rsc->remote_reconnect_ms == 0) {
3530                     /* With no reconnect interval, the connection is allowed to
3531                      * start again after the remote node is fenced and
3532                      * completely stopped. (With a reconnect interval, we wait
3533                      * for the failure to be cleared entirely before attempting
3534                      * to reconnect.)
3535                      */
3536                     *on_fail = action_fail_ignore;
3537                     rsc->next_role = RSC_ROLE_UNKNOWN;
3538                 }
3539                 break;
3540         }
3541     }
3542 }
3543 
3544 /*!
3545  * \internal
3546  * \brief Remap informational monitor results to usual values
3547  *
3548  * Certain OCF result codes are for providing extended information to the
3549  * user about services that aren't yet failed but not entirely healthy either.
3550  * These must be treated as the "normal" result by pacemaker.
3551  *
3552  * \param[in] rc        Actual result of a monitor action
3553  * \param[in] xml_op    Operation history XML
3554  * \param[in] node      Node that operation happened on
3555  * \param[in] rsc       Resource that operation happened to
3556  * \param[in] data_set  Cluster working set
3557  *
3558  * \return Result code that pacemaker should use
3559  *
3560  * \note If the result is remapped, and the node is not shutting down or failed,
3561  *       the operation will be recorded in the data set's list of failed
3562  *       operations, to highlight it for the user.
3563  */
3564 static int
3565 remap_monitor_rc(int rc, xmlNode *xml_op, const pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
3566                  const pe_resource_t *rsc, pe_working_set_t *data_set)
3567 {
3568     int remapped_rc = pcmk__effective_rc(rc);
3569 
3570     if (rc != remapped_rc) {
3571         crm_trace("Remapping monitor result %d to %d", rc, remapped_rc);
3572         if (!node->details->shutdown || node->details->online) {
3573             record_failed_op(xml_op, node, rsc, data_set);
3574         }
3575     }
3576     return remapped_rc;
3577 }
3578 
3579 static void
3580 unpack_rsc_op(pe_resource_t *rsc, pe_node_t *node, xmlNode *xml_op,
     /* [previous][next][first][last][top][bottom][index][help] */
3581               xmlNode **last_failure, enum action_fail_response *on_fail,
3582               pe_working_set_t *data_set)
3583 {
3584     int rc = 0;
3585     int task_id = 0;
3586     int target_rc = 0;
3587     int status = PCMK_LRM_OP_UNKNOWN;
3588     guint interval_ms = 0;
3589     const char *task = NULL;
3590     const char *task_key = NULL;
3591     const char *exit_reason = NULL;
3592     bool expired = FALSE;
3593     pe_resource_t *parent = rsc;
3594     enum action_fail_response failure_strategy = action_fail_recover;
3595 
3596     CRM_CHECK(rsc && node && xml_op, return);
3597 
3598     target_rc = pe__target_rc_from_xml(xml_op);
3599     task_key = get_op_key(xml_op);
3600     task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3601     exit_reason = crm_element_value(xml_op, XML_LRM_ATTR_EXIT_REASON);
3602     if (exit_reason == NULL) {
3603         exit_reason = "";
3604     }
3605 
3606     crm_element_value_int(xml_op, XML_LRM_ATTR_RC, &rc);
3607     crm_element_value_int(xml_op, XML_LRM_ATTR_CALLID, &task_id);
3608     crm_element_value_int(xml_op, XML_LRM_ATTR_OPSTATUS, &status);
3609     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
3610 
3611     CRM_CHECK(task != NULL, return);
3612     CRM_CHECK(status <= PCMK_LRM_OP_INVALID, return);
3613     CRM_CHECK(status >= PCMK_LRM_OP_PENDING, return);
3614 
3615     if (!strcmp(task, CRMD_ACTION_NOTIFY) ||
3616         !strcmp(task, CRMD_ACTION_METADATA)) {
3617         /* safe to ignore these */
3618         return;
3619     }
3620 
3621     if (!pcmk_is_set(rsc->flags, pe_rsc_unique)) {
3622         parent = uber_parent(rsc);
3623     }
3624 
3625     pe_rsc_trace(rsc, "Unpacking task %s/%s (call_id=%d, status=%d, rc=%d) on %s (role=%s)",
3626                  task_key, task, task_id, status, rc, node->details->uname, role2text(rsc->role));
3627 
3628     if (node->details->unclean) {
3629         pe_rsc_trace(rsc, "Node %s (where %s is running) is unclean."
3630                      " Further action depends on the value of the stop's on-fail attribute",
3631                      node->details->uname, rsc->id);
3632     }
3633 
3634     /* It should be possible to call remap_monitor_rc() first then call
3635      * check_operation_expiry() only if rc != target_rc, because there should
3636      * never be a fail count without at least one unexpected result in the
3637      * resource history. That would be more efficient by avoiding having to call
3638      * check_operation_expiry() for expected results.
3639      *
3640      * However, we do have such configurations in the scheduler regression
3641      * tests, even if it shouldn't be possible with the current code. It's
3642      * probably a good idea anyway, but that would require updating the test
3643      * inputs to something currently possible.
3644      */
3645 
3646     if ((status != PCMK_LRM_OP_NOT_INSTALLED)
3647         && check_operation_expiry(rsc, node, rc, xml_op, data_set)) {
3648         expired = TRUE;
3649     }
3650 
3651     if (!strcmp(task, CRMD_ACTION_STATUS)) {
3652         rc = remap_monitor_rc(rc, xml_op, node, rsc, data_set);
3653     }
3654 
3655     if (expired && (rc != target_rc)) {
3656         const char *magic = crm_element_value(xml_op, XML_ATTR_TRANSITION_MAGIC);
3657 
3658         if (interval_ms == 0) {
3659             crm_notice("Ignoring expired %s failure on %s "
3660                        CRM_XS " actual=%d expected=%d magic=%s",
3661                        task_key, node->details->uname, rc, target_rc, magic);
3662             goto done;
3663 
3664         } else if(node->details->online && node->details->unclean == FALSE) {
3665             /* Reschedule the recurring monitor. CancelXmlOp() won't work at
3666              * this stage, so as a hacky workaround, forcibly change the restart
3667              * digest so check_action_definition() does what we want later.
3668              *
3669              * @TODO We should skip this if there is a newer successful monitor.
3670              *       Also, this causes rescheduling only if the history entry
3671              *       has an op-digest (which the expire-non-blocked-failure
3672              *       scheduler regression test doesn't, but that may not be a
3673              *       realistic scenario in production).
3674              */
3675             crm_notice("Rescheduling %s after failure expired on %s "
3676                        CRM_XS " actual=%d expected=%d magic=%s",
3677                        task_key, node->details->uname, rc, target_rc, magic);
3678             crm_xml_add(xml_op, XML_LRM_ATTR_RESTART_DIGEST, "calculated-failure-timeout");
3679             goto done;
3680         }
3681     }
3682 
3683     /* If the executor reported an operation status of anything but done or
3684      * error, consider that final. But for done or error, we know better whether
3685      * it should be treated as a failure or not, because we know the expected
3686      * result.
3687      */
3688     if(status == PCMK_LRM_OP_DONE || status == PCMK_LRM_OP_ERROR) {
3689         status = determine_op_status(rsc, rc, target_rc, node, xml_op, on_fail, data_set);
3690         pe_rsc_trace(rsc, "Remapped %s status to %d", task_key, status);
3691     }
3692 
3693     switch (status) {
3694         case PCMK_LRM_OP_CANCELLED:
3695             // Should never happen
3696             pe_err("Resource history contains cancellation '%s' "
3697                    "(%s of %s on %s at %s)",
3698                    ID(xml_op), task, rsc->id, node->details->uname,
3699                    last_change_str(xml_op));
3700             break;
3701 
3702         case PCMK_LRM_OP_PENDING:
3703             if (!strcmp(task, CRMD_ACTION_START)) {
3704                 pe__set_resource_flags(rsc, pe_rsc_start_pending);
3705                 set_active(rsc);
3706 
3707             } else if (!strcmp(task, CRMD_ACTION_PROMOTE)) {
3708                 rsc->role = RSC_ROLE_MASTER;
3709 
3710             } else if (!strcmp(task, CRMD_ACTION_MIGRATE) && node->details->unclean) {
3711                 /* If a pending migrate_to action is out on a unclean node,
3712                  * we have to force the stop action on the target. */
3713                 const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
3714                 pe_node_t *target = pe_find_node(data_set->nodes, migrate_target);
3715                 if (target) {
3716                     stop_action(rsc, target, FALSE);
3717                 }
3718             }
3719 
3720             if (rsc->pending_task == NULL) {
3721                 if ((interval_ms != 0) || strcmp(task, CRMD_ACTION_STATUS)) {
3722                     rsc->pending_task = strdup(task);
3723                     rsc->pending_node = node;
3724                 } else {
3725                     /* Pending probes are not printed, even if pending
3726                      * operations are requested. If someone ever requests that
3727                      * behavior, enable the below and the corresponding part of
3728                      * native.c:native_pending_task().
3729                      */
3730 #if 0
3731                     rsc->pending_task = strdup("probe");
3732                     rsc->pending_node = node;
3733 #endif
3734                 }
3735             }
3736             break;
3737 
3738         case PCMK_LRM_OP_DONE:
3739             pe_rsc_trace(rsc, "%s of %s on %s completed at %s " CRM_XS " id=%s",
3740                          task, rsc->id, node->details->uname,
3741                          last_change_str(xml_op), ID(xml_op));
3742             update_resource_state(rsc, node, xml_op, task, rc, *last_failure, on_fail, data_set);
3743             break;
3744 
3745         case PCMK_LRM_OP_NOT_INSTALLED:
3746             failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
3747             if (failure_strategy == action_fail_ignore) {
3748                 crm_warn("Cannot ignore failed %s of %s on %s: "
3749                          "Resource agent doesn't exist "
3750                          CRM_XS " status=%d rc=%d id=%s",
3751                          task, rsc->id, node->details->uname, status, rc,
3752                          ID(xml_op));
3753                 /* Also for printing it as "FAILED" by marking it as pe_rsc_failed later */
3754                 *on_fail = action_fail_migrate;
3755             }
3756             resource_location(parent, node, -INFINITY, "hard-error", data_set);
3757             unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail, data_set);
3758             break;
3759 
3760         case PCMK_LRM_OP_NOT_CONNECTED:
3761             if (pe__is_guest_or_remote_node(node)
3762                 && pcmk_is_set(node->details->remote_rsc->flags, pe_rsc_managed)) {
3763                 /* We should never get into a situation where a managed remote
3764                  * connection resource is considered OK but a resource action
3765                  * behind the connection gets a "not connected" status. But as a
3766                  * fail-safe in case a bug or unusual circumstances do lead to
3767                  * that, ensure the remote connection is considered failed.
3768                  */
3769                 pe__set_resource_flags(node->details->remote_rsc,
3770                                        pe_rsc_failed|pe_rsc_stop);
3771             }
3772 
3773             // fall through
3774 
3775         case PCMK_LRM_OP_ERROR:
3776         case PCMK_LRM_OP_ERROR_HARD:
3777         case PCMK_LRM_OP_ERROR_FATAL:
3778         case PCMK_LRM_OP_TIMEOUT:
3779         case PCMK_LRM_OP_NOTSUPPORTED:
3780         case PCMK_LRM_OP_INVALID:
3781 
3782             failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
3783             if ((failure_strategy == action_fail_ignore)
3784                 || (failure_strategy == action_fail_restart_container
3785                     && !strcmp(task, CRMD_ACTION_STOP))) {
3786 
3787                 crm_warn("Pretending failed %s (%s%s%s) of %s on %s at %s "
3788                          "succeeded " CRM_XS " rc=%d id=%s",
3789                          task, services_ocf_exitcode_str(rc),
3790                          (*exit_reason? ": " : ""), exit_reason, rsc->id,
3791                          node->details->uname, last_change_str(xml_op), rc,
3792                          ID(xml_op));
3793 
3794                 update_resource_state(rsc, node, xml_op, task, target_rc, *last_failure, on_fail, data_set);
3795                 crm_xml_add(xml_op, XML_ATTR_UNAME, node->details->uname);
3796                 pe__set_resource_flags(rsc, pe_rsc_failure_ignored);
3797 
3798                 record_failed_op(xml_op, node, rsc, data_set);
3799 
3800                 if ((failure_strategy == action_fail_restart_container)
3801                     && cmp_on_fail(*on_fail, action_fail_recover) <= 0) {
3802                     *on_fail = failure_strategy;
3803                 }
3804 
3805             } else {
3806                 unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail, data_set);
3807 
3808                 if(status == PCMK_LRM_OP_ERROR_HARD) {
3809                     do_crm_log(rc != PCMK_OCF_NOT_INSTALLED?LOG_ERR:LOG_NOTICE,
3810                                "Preventing %s from restarting on %s because "
3811                                "of hard failure (%s%s%s)" CRM_XS " rc=%d id=%s",
3812                                parent->id, node->details->uname,
3813                                services_ocf_exitcode_str(rc),
3814                                (*exit_reason? ": " : ""), exit_reason,
3815                                rc, ID(xml_op));
3816                     resource_location(parent, node, -INFINITY, "hard-error", data_set);
3817 
3818                 } else if(status == PCMK_LRM_OP_ERROR_FATAL) {
3819                     crm_err("Preventing %s from restarting anywhere because "
3820                             "of fatal failure (%s%s%s) " CRM_XS " rc=%d id=%s",
3821                             parent->id, services_ocf_exitcode_str(rc),
3822                             (*exit_reason? ": " : ""), exit_reason,
3823                             rc, ID(xml_op));
3824                     resource_location(parent, NULL, -INFINITY, "fatal-error", data_set);
3825                 }
3826             }
3827             break;
3828     }
3829 
3830   done:
3831     pe_rsc_trace(rsc, "Resource %s after %s: role=%s, next=%s",
3832                  rsc->id, task, role2text(rsc->role),
3833                  role2text(rsc->next_role));
3834 }
3835 
3836 static void
3837 add_node_attrs(xmlNode *xml_obj, pe_node_t *node, bool overwrite,
     /* [previous][next][first][last][top][bottom][index][help] */
3838                pe_working_set_t *data_set)
3839 {
3840     const char *cluster_name = NULL;
3841 
3842     pe_rule_eval_data_t rule_data = {
3843         .node_hash = NULL,
3844         .role = RSC_ROLE_UNKNOWN,
3845         .now = data_set->now,
3846         .match_data = NULL,
3847         .rsc_data = NULL,
3848         .op_data = NULL
3849     };
3850 
3851     g_hash_table_insert(node->details->attrs,
3852                         strdup(CRM_ATTR_UNAME), strdup(node->details->uname));
3853 
3854     g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_ID),
3855                         strdup(node->details->id));
3856     if (pcmk__str_eq(node->details->id, data_set->dc_uuid, pcmk__str_casei)) {
3857         data_set->dc_node = node;
3858         node->details->is_dc = TRUE;
3859         g_hash_table_insert(node->details->attrs,
3860                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_TRUE));
3861     } else {
3862         g_hash_table_insert(node->details->attrs,
3863                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_FALSE));
3864     }
3865 
3866     cluster_name = g_hash_table_lookup(data_set->config_hash, "cluster-name");
3867     if (cluster_name) {
3868         g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_CLUSTER_NAME),
3869                             strdup(cluster_name));
3870     }
3871 
3872     pe__unpack_dataset_nvpairs(xml_obj, XML_TAG_ATTR_SETS, &rule_data,
3873                                node->details->attrs, NULL, overwrite, data_set);
3874 
3875     if (pe_node_attribute_raw(node, CRM_ATTR_SITE_NAME) == NULL) {
3876         const char *site_name = pe_node_attribute_raw(node, "site-name");
3877 
3878         if (site_name) {
3879             g_hash_table_insert(node->details->attrs,
3880                                 strdup(CRM_ATTR_SITE_NAME),
3881                                 strdup(site_name));
3882 
3883         } else if (cluster_name) {
3884             /* Default to cluster-name if unset */
3885             g_hash_table_insert(node->details->attrs,
3886                                 strdup(CRM_ATTR_SITE_NAME),
3887                                 strdup(cluster_name));
3888         }
3889     }
3890 }
3891 
3892 static GListPtr
3893 extract_operations(const char *node, const char *rsc, xmlNode * rsc_entry, gboolean active_filter)
     /* [previous][next][first][last][top][bottom][index][help] */
3894 {
3895     int counter = -1;
3896     int stop_index = -1;
3897     int start_index = -1;
3898 
3899     xmlNode *rsc_op = NULL;
3900 
3901     GListPtr gIter = NULL;
3902     GListPtr op_list = NULL;
3903     GListPtr sorted_op_list = NULL;
3904 
3905     /* extract operations */
3906     op_list = NULL;
3907     sorted_op_list = NULL;
3908 
3909     for (rsc_op = pcmk__xe_first_child(rsc_entry);
3910          rsc_op != NULL; rsc_op = pcmk__xe_next(rsc_op)) {
3911 
3912         if (pcmk__str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP,
3913                          pcmk__str_none)) {
3914             crm_xml_add(rsc_op, "resource", rsc);
3915             crm_xml_add(rsc_op, XML_ATTR_UNAME, node);
3916             op_list = g_list_prepend(op_list, rsc_op);
3917         }
3918     }
3919 
3920     if (op_list == NULL) {
3921         /* if there are no operations, there is nothing to do */
3922         return NULL;
3923     }
3924 
3925     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
3926 
3927     /* create active recurring operations as optional */
3928     if (active_filter == FALSE) {
3929         return sorted_op_list;
3930     }
3931 
3932     op_list = NULL;
3933 
3934     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
3935 
3936     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
3937         xmlNode *rsc_op = (xmlNode *) gIter->data;
3938 
3939         counter++;
3940 
3941         if (start_index < stop_index) {
3942             crm_trace("Skipping %s: not active", ID(rsc_entry));
3943             break;
3944 
3945         } else if (counter < start_index) {
3946             crm_trace("Skipping %s: old", ID(rsc_op));
3947             continue;
3948         }
3949         op_list = g_list_append(op_list, rsc_op);
3950     }
3951 
3952     g_list_free(sorted_op_list);
3953     return op_list;
3954 }
3955 
3956 GListPtr
3957 find_operations(const char *rsc, const char *node, gboolean active_filter,
     /* [previous][next][first][last][top][bottom][index][help] */
3958                 pe_working_set_t * data_set)
3959 {
3960     GListPtr output = NULL;
3961     GListPtr intermediate = NULL;
3962 
3963     xmlNode *tmp = NULL;
3964     xmlNode *status = find_xml_node(data_set->input, XML_CIB_TAG_STATUS, TRUE);
3965 
3966     pe_node_t *this_node = NULL;
3967 
3968     xmlNode *node_state = NULL;
3969 
3970     for (node_state = pcmk__xe_first_child(status); node_state != NULL;
3971          node_state = pcmk__xe_next(node_state)) {
3972 
3973         if (pcmk__str_eq((const char *)node_state->name, XML_CIB_TAG_STATE, pcmk__str_none)) {
3974             const char *uname = crm_element_value(node_state, XML_ATTR_UNAME);
3975 
3976             if (node != NULL && !pcmk__str_eq(uname, node, pcmk__str_casei)) {
3977                 continue;
3978             }
3979 
3980             this_node = pe_find_node(data_set->nodes, uname);
3981             if(this_node == NULL) {
3982                 CRM_LOG_ASSERT(this_node != NULL);
3983                 continue;
3984 
3985             } else if (pe__is_guest_or_remote_node(this_node)) {
3986                 determine_remote_online_status(data_set, this_node);
3987 
3988             } else {
3989                 determine_online_status(node_state, this_node, data_set);
3990             }
3991 
3992             if (this_node->details->online
3993                 || pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
3994                 /* offline nodes run no resources...
3995                  * unless stonith is enabled in which case we need to
3996                  *   make sure rsc start events happen after the stonith
3997                  */
3998                 xmlNode *lrm_rsc = NULL;
3999 
4000                 tmp = find_xml_node(node_state, XML_CIB_TAG_LRM, FALSE);
4001                 tmp = find_xml_node(tmp, XML_LRM_TAG_RESOURCES, FALSE);
4002 
4003                 for (lrm_rsc = pcmk__xe_first_child(tmp); lrm_rsc != NULL;
4004                      lrm_rsc = pcmk__xe_next(lrm_rsc)) {
4005 
4006                     if (pcmk__str_eq((const char *)lrm_rsc->name,
4007                                      XML_LRM_TAG_RESOURCE, pcmk__str_none)) {
4008 
4009                         const char *rsc_id = crm_element_value(lrm_rsc, XML_ATTR_ID);
4010 
4011                         if (rsc != NULL && !pcmk__str_eq(rsc_id, rsc, pcmk__str_casei)) {
4012                             continue;
4013                         }
4014 
4015                         intermediate = extract_operations(uname, rsc_id, lrm_rsc, active_filter);
4016                         output = g_list_concat(output, intermediate);
4017                     }
4018                 }
4019             }
4020         }
4021     }
4022 
4023     return output;
4024 }

/* [previous][next][first][last][top][bottom][index][help] */