root/lib/pengine/unpack.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. is_dangling_container_remote_node
  2. pe_fence_node
  3. set_if_xpath
  4. unpack_config
  5. destroy_digest_cache
  6. pe_create_node
  7. remote_id_conflict
  8. expand_remote_rsc_meta
  9. handle_startup_fencing
  10. unpack_nodes
  11. setup_container
  12. unpack_remote_nodes
  13. link_rsc2remotenode
  14. destroy_tag
  15. unpack_resources
  16. unpack_tags
  17. unpack_ticket_state
  18. unpack_tickets_state
  19. get_ticket_state_legacy
  20. unpack_handle_remote_attrs
  21. unpack_node_loop
  22. unpack_status
  23. determine_online_status_no_fencing
  24. determine_online_status_fencing
  25. determine_remote_online_status
  26. determine_online_status
  27. pe_base_name_end
  28. clone_strip
  29. clone_zero
  30. create_fake_resource
  31. find_anonymous_clone
  32. unpack_find_resource
  33. process_orphan_resource
  34. process_rsc_state
  35. process_recurring
  36. calculate_active_ops
  37. unpack_lrm_rsc_state
  38. handle_orphaned_container_fillers
  39. unpack_lrm_resources
  40. set_active
  41. set_node_score
  42. find_lrm_op
  43. unpack_rsc_migration
  44. unpack_rsc_migration_failure
  45. record_failed_op
  46. get_op_key
  47. unpack_rsc_op_failure
  48. determine_op_status
  49. check_operation_expiry
  50. get_target_rc
  51. get_action_on_fail
  52. update_resource_state
  53. unpack_rsc_op
  54. add_node_attrs
  55. extract_operations
  56. find_operations

   1 /*
   2  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
   3  *
   4  * This library is free software; you can redistribute it and/or
   5  * modify it under the terms of the GNU Lesser General Public
   6  * License as published by the Free Software Foundation; either
   7  * version 2.1 of the License, or (at your option) any later version.
   8  *
   9  * This library is distributed in the hope that it will be useful,
  10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  12  * Lesser General Public License for more details.
  13  *
  14  * You should have received a copy of the GNU Lesser General Public
  15  * License along with this library; if not, write to the Free Software
  16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  17  */
  18 #include <crm_internal.h>
  19 
  20 #include <glib.h>
  21 
  22 #include <crm/crm.h>
  23 #include <crm/services.h>
  24 #include <crm/msg_xml.h>
  25 #include <crm/common/xml.h>
  26 
  27 #include <crm/common/util.h>
  28 #include <crm/pengine/rules.h>
  29 #include <crm/pengine/internal.h>
  30 #include <unpack.h>
  31 
  32 CRM_TRACE_INIT_DATA(pe_status);
  33 
  34 #define set_config_flag(data_set, option, flag) do {                    \
  35         const char *tmp = pe_pref(data_set->config_hash, option);       \
  36         if(tmp) {                                                       \
  37             if(crm_is_true(tmp)) {                                      \
  38                 set_bit(data_set->flags, flag);                 \
  39             } else {                                                    \
  40                 clear_bit(data_set->flags, flag);               \
  41             }                                                           \
  42         }                                                               \
  43     } while(0)
  44 
  45 gboolean unpack_rsc_op(resource_t * rsc, node_t * node, xmlNode * xml_op, xmlNode ** last_failure,
  46                        enum action_fail_response *failed, pe_working_set_t * data_set);
  47 static gboolean determine_remote_online_status(pe_working_set_t * data_set, node_t * this_node);
  48 
  49 // Bitmask for warnings we only want to print once
  50 uint32_t pe_wo = 0;
  51 
  52 static gboolean
  53 is_dangling_container_remote_node(node_t *node)
     /* [previous][next][first][last][top][bottom][index][help] */
  54 {
  55     /* we are looking for a remote-node that was supposed to be mapped to a
  56      * container resource, but all traces of that container have disappeared 
  57      * from both the config and the status section. */
  58     if (is_remote_node(node) &&
  59         node->details->remote_rsc &&
  60         node->details->remote_rsc->container == NULL &&
  61         is_set(node->details->remote_rsc->flags, pe_rsc_orphan_container_filler)) {
  62         return TRUE;
  63     }
  64 
  65     return FALSE;
  66 }
  67 
  68 
  69 /*!
  70  * \brief Schedule a fence action for a node
  71  *
  72  * \param[in,out] data_set  Current working set of cluster
  73  * \param[in,out] node      Node to fence
  74  * \param[in]     reason    Text description of why fencing is needed
  75  */
  76 void
  77 pe_fence_node(pe_working_set_t * data_set, node_t * node, const char *reason)
     /* [previous][next][first][last][top][bottom][index][help] */
  78 {
  79     CRM_CHECK(node, return);
  80 
  81     /* A guest node is fenced by marking its container as failed */
  82     if (is_container_remote_node(node)) {
  83         resource_t *rsc = node->details->remote_rsc->container;
  84 
  85         if (is_set(rsc->flags, pe_rsc_failed) == FALSE) {
  86             if (!is_set(rsc->flags, pe_rsc_managed)) {
  87                 crm_notice("Not fencing guest node %s "
  88                            "(otherwise would because %s): "
  89                            "its guest resource %s is unmanaged",
  90                            node->details->uname, reason, rsc->id);
  91             } else {
  92                 crm_warn("Guest node %s will be fenced "
  93                          "(by recovering its guest resource %s): %s",
  94                          node->details->uname, rsc->id, reason);
  95 
  96                 /* We don't mark the node as unclean because that would prevent the
  97                  * node from running resources. We want to allow it to run resources
  98                  * in this transition if the recovery succeeds.
  99                  */
 100                 node->details->remote_requires_reset = TRUE;
 101                 set_bit(rsc->flags, pe_rsc_failed);
 102             }
 103         }
 104 
 105     } else if (is_dangling_container_remote_node(node)) {
 106         crm_info("Cleaning up dangling connection for guest node %s: "
 107                  "fencing was already done because %s, "
 108                  "and guest resource no longer exists",
 109                  node->details->uname, reason);
 110         set_bit(node->details->remote_rsc->flags, pe_rsc_failed);
 111 
 112     } else if (is_baremetal_remote_node(node)) {
 113         resource_t *rsc = node->details->remote_rsc;
 114 
 115         if (rsc && (!is_set(rsc->flags, pe_rsc_managed))) {
 116             crm_notice("Not fencing remote node %s "
 117                        "(otherwise would because %s): connection is unmanaged",
 118                        node->details->uname, reason);
 119         } else if(node->details->remote_requires_reset == FALSE) {
 120             node->details->remote_requires_reset = TRUE;
 121             crm_warn("Remote node %s %s: %s",
 122                      node->details->uname,
 123                      pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 124                      reason);
 125         }
 126         node->details->unclean = TRUE;
 127         pe_fence_op(node, NULL, TRUE, reason, data_set);
 128 
 129     } else if (node->details->unclean) {
 130         crm_trace("Cluster node %s %s because %s",
 131                   node->details->uname,
 132                   pe_can_fence(data_set, node)? "would also be fenced" : "also is unclean",
 133                   reason);
 134 
 135     } else {
 136         crm_warn("Cluster node %s %s: %s",
 137                  node->details->uname,
 138                  pe_can_fence(data_set, node)? "will be fenced" : "is unclean",
 139                  reason);
 140         node->details->unclean = TRUE;
 141         pe_fence_op(node, NULL, TRUE, reason, data_set);
 142     }
 143 }
 144 
 145 // @TODO xpaths can't handle templates, rules, or id-refs
 146 
 147 // nvpair with provides or requires set to unfencing
 148 #define XPATH_UNFENCING_NVPAIR XML_CIB_TAG_NVPAIR                \
 149     "[(@" XML_NVPAIR_ATTR_NAME "='" XML_RSC_ATTR_PROVIDES "'"    \
 150     "or @" XML_NVPAIR_ATTR_NAME "='" XML_RSC_ATTR_REQUIRES "') " \
 151     "and @" XML_NVPAIR_ATTR_VALUE "='unfencing']"
 152 
 153 // unfencing in rsc_defaults or any resource
 154 #define XPATH_ENABLE_UNFENCING \
 155     "/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RESOURCES   \
 156     "//" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR                                               \
 157     "|/" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_RSCCONFIG  \
 158     "/" XML_TAG_META_SETS "/" XPATH_UNFENCING_NVPAIR
 159 
 160 static
 161 void set_if_xpath(unsigned long long flag, const char *xpath,
     /* [previous][next][first][last][top][bottom][index][help] */
 162                   pe_working_set_t *data_set)
 163 {
 164     xmlXPathObjectPtr result = NULL;
 165 
 166     if (is_not_set(data_set->flags, flag)) {
 167         result = xpath_search(data_set->input, xpath);
 168         if (result && (numXpathResults(result) > 0)) {
 169             set_bit(data_set->flags, flag);
 170         }
 171         freeXpathObject(result);
 172     }
 173 }
 174 
 175 gboolean
 176 unpack_config(xmlNode * config, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 177 {
 178     const char *value = NULL;
 179     GHashTable *config_hash = crm_str_table_new();
 180 
 181     data_set->config_hash = config_hash;
 182 
 183     unpack_instance_attributes(data_set->input, config, XML_CIB_TAG_PROPSET, NULL, config_hash,
 184                                CIB_OPTIONS_FIRST, FALSE, data_set->now);
 185 
 186     verify_pe_options(data_set->config_hash);
 187 
 188     set_config_flag(data_set, "enable-startup-probes", pe_flag_startup_probes);
 189     if(is_not_set(data_set->flags, pe_flag_startup_probes)) {
 190         crm_info("Startup probes: disabled (dangerous)");
 191     }
 192 
 193     value = pe_pref(data_set->config_hash, XML_ATTR_HAVE_WATCHDOG);
 194     if (value && crm_is_true(value)) {
 195         crm_notice("Watchdog will be used via SBD if fencing is required");
 196         set_bit(data_set->flags, pe_flag_have_stonith_resource);
 197     }
 198 
 199     /* Set certain flags via xpath here, so they can be used before the relevant
 200      * configuration sections are unpacked.
 201      */
 202     set_if_xpath(pe_flag_enable_unfencing, XPATH_ENABLE_UNFENCING, data_set);
 203 
 204     value = pe_pref(data_set->config_hash, "stonith-timeout");
 205     data_set->stonith_timeout = crm_get_msec(value);
 206     crm_debug("STONITH timeout: %d", data_set->stonith_timeout);
 207 
 208     set_config_flag(data_set, "stonith-enabled", pe_flag_stonith_enabled);
 209     crm_debug("STONITH of failed nodes is %s",
 210               is_set(data_set->flags, pe_flag_stonith_enabled) ? "enabled" : "disabled");
 211 
 212     data_set->stonith_action = pe_pref(data_set->config_hash, "stonith-action");
 213     crm_trace("STONITH will %s nodes", data_set->stonith_action);
 214 
 215     set_config_flag(data_set, "concurrent-fencing", pe_flag_concurrent_fencing);
 216     crm_debug("Concurrent fencing is %s",
 217               is_set(data_set->flags, pe_flag_concurrent_fencing) ? "enabled" : "disabled");
 218 
 219     set_config_flag(data_set, "stop-all-resources", pe_flag_stop_everything);
 220     crm_debug("Stop all active resources: %s",
 221               is_set(data_set->flags, pe_flag_stop_everything) ? "true" : "false");
 222 
 223     set_config_flag(data_set, "symmetric-cluster", pe_flag_symmetric_cluster);
 224     if (is_set(data_set->flags, pe_flag_symmetric_cluster)) {
 225         crm_debug("Cluster is symmetric" " - resources can run anywhere by default");
 226     }
 227 
 228     value = pe_pref(data_set->config_hash, "default-resource-stickiness");
 229     if (value) {
 230         pe_warn_once(pe_wo_default_stick,
 231                      "Support for 'default-resource-stickiness' cluster property"
 232                      " is deprecated and will be removed in a future release"
 233                      " (use resource-stickiness in rsc_defaults instead)");
 234     }
 235     data_set->default_resource_stickiness = char2score(value);
 236     crm_debug("Default stickiness: %d", data_set->default_resource_stickiness);
 237 
 238     value = pe_pref(data_set->config_hash, "no-quorum-policy");
 239 
 240     if (safe_str_eq(value, "ignore")) {
 241         data_set->no_quorum_policy = no_quorum_ignore;
 242 
 243     } else if (safe_str_eq(value, "freeze")) {
 244         data_set->no_quorum_policy = no_quorum_freeze;
 245 
 246     } else if (safe_str_eq(value, "suicide")) {
 247         if (is_set(data_set->flags, pe_flag_stonith_enabled)) {
 248             int do_panic = 0;
 249 
 250             crm_element_value_int(data_set->input, XML_ATTR_QUORUM_PANIC,
 251                                   &do_panic);
 252             if (do_panic || is_set(data_set->flags, pe_flag_have_quorum)) {
 253                 data_set->no_quorum_policy = no_quorum_suicide;
 254             } else {
 255                 crm_notice("Resetting no-quorum-policy to 'stop': cluster has never had quorum");
 256                 data_set->no_quorum_policy = no_quorum_stop;
 257             }
 258         } else {
 259             crm_config_err("Resetting no-quorum-policy to 'stop': stonith is not configured");
 260             data_set->no_quorum_policy = no_quorum_stop;
 261         }
 262 
 263     } else {
 264         data_set->no_quorum_policy = no_quorum_stop;
 265     }
 266 
 267     switch (data_set->no_quorum_policy) {
 268         case no_quorum_freeze:
 269             crm_debug("On loss of CCM Quorum: Freeze resources");
 270             break;
 271         case no_quorum_stop:
 272             crm_debug("On loss of CCM Quorum: Stop ALL resources");
 273             break;
 274         case no_quorum_suicide:
 275             crm_notice("On loss of CCM Quorum: Fence all remaining nodes");
 276             break;
 277         case no_quorum_ignore:
 278             crm_notice("On loss of CCM Quorum: Ignore");
 279             break;
 280     }
 281 
 282     set_config_flag(data_set, "stop-orphan-resources", pe_flag_stop_rsc_orphans);
 283     crm_trace("Orphan resources are %s",
 284               is_set(data_set->flags, pe_flag_stop_rsc_orphans) ? "stopped" : "ignored");
 285 
 286     set_config_flag(data_set, "stop-orphan-actions", pe_flag_stop_action_orphans);
 287     crm_trace("Orphan resource actions are %s",
 288               is_set(data_set->flags, pe_flag_stop_action_orphans) ? "stopped" : "ignored");
 289 
 290     set_config_flag(data_set, "remove-after-stop", pe_flag_remove_after_stop);
 291     crm_trace("Stopped resources are removed from the status section: %s",
 292               is_set(data_set->flags, pe_flag_remove_after_stop) ? "true" : "false");
 293 
 294     set_config_flag(data_set, "maintenance-mode", pe_flag_maintenance_mode);
 295     crm_trace("Maintenance mode: %s",
 296               is_set(data_set->flags, pe_flag_maintenance_mode) ? "true" : "false");
 297 
 298     if (is_set(data_set->flags, pe_flag_maintenance_mode)) {
 299         clear_bit(data_set->flags, pe_flag_is_managed_default);
 300     } else if (pe_pref(data_set->config_hash, "is-managed-default")) {
 301         set_config_flag(data_set, "is-managed-default", pe_flag_is_managed_default);
 302         pe_warn_once(pe_wo_default_isman,
 303                      "Support for 'is-managed-default' cluster property"
 304                      " is deprecated and will be removed in a future release"
 305                      " (use is-managed in rsc_defaults instead)");
 306     }
 307     crm_trace("By default resources are %smanaged",
 308               is_set(data_set->flags, pe_flag_is_managed_default) ? "" : "not ");
 309 
 310     set_config_flag(data_set, "start-failure-is-fatal", pe_flag_start_failure_fatal);
 311     crm_trace("Start failures are %s",
 312               is_set(data_set->flags,
 313                      pe_flag_start_failure_fatal) ? "always fatal" : "handled by failcount");
 314 
 315     if (is_set(data_set->flags, pe_flag_stonith_enabled)) {
 316         set_config_flag(data_set, "startup-fencing", pe_flag_startup_fencing);
 317     }
 318     if (is_set(data_set->flags, pe_flag_startup_fencing)) {
 319         crm_trace("Unseen nodes will be fenced");
 320     } else {
 321         pe_warn_once(pe_wo_blind, "Blind faith: not fencing unseen nodes");
 322     }
 323 
 324     node_score_red = char2score(pe_pref(data_set->config_hash, "node-health-red"));
 325     node_score_green = char2score(pe_pref(data_set->config_hash, "node-health-green"));
 326     node_score_yellow = char2score(pe_pref(data_set->config_hash, "node-health-yellow"));
 327 
 328     crm_debug("Node scores: 'red' = %s, 'yellow' = %s, 'green' = %s",
 329              pe_pref(data_set->config_hash, "node-health-red"),
 330              pe_pref(data_set->config_hash, "node-health-yellow"),
 331              pe_pref(data_set->config_hash, "node-health-green"));
 332 
 333     data_set->placement_strategy = pe_pref(data_set->config_hash, "placement-strategy");
 334     crm_trace("Placement strategy: %s", data_set->placement_strategy);
 335 
 336     return TRUE;
 337 }
 338 
 339 static void
 340 destroy_digest_cache(gpointer ptr)
     /* [previous][next][first][last][top][bottom][index][help] */
 341 {
 342     op_digest_cache_t *data = ptr;
 343 
 344     free_xml(data->params_all);
 345     free_xml(data->params_secure);
 346     free_xml(data->params_restart);
 347 
 348     free(data->digest_all_calc);
 349     free(data->digest_restart_calc);
 350     free(data->digest_secure_calc);
 351 
 352     free(data);
 353 }
 354 
 355 node_t *
 356 pe_create_node(const char *id, const char *uname, const char *type,
     /* [previous][next][first][last][top][bottom][index][help] */
 357                const char *score, pe_working_set_t * data_set)
 358 {
 359     node_t *new_node = NULL;
 360 
 361     if (pe_find_node(data_set->nodes, uname) != NULL) {
 362         crm_config_warn("Detected multiple node entries with uname=%s"
 363                         " - this is rarely intended", uname);
 364     }
 365 
 366     new_node = calloc(1, sizeof(node_t));
 367     if (new_node == NULL) {
 368         return NULL;
 369     }
 370 
 371     new_node->weight = char2score(score);
 372     new_node->fixed = FALSE;
 373     new_node->details = calloc(1, sizeof(struct node_shared_s));
 374 
 375     if (new_node->details == NULL) {
 376         free(new_node);
 377         return NULL;
 378     }
 379 
 380     crm_trace("Creating node for entry %s/%s", uname, id);
 381     new_node->details->id = id;
 382     new_node->details->uname = uname;
 383     new_node->details->online = FALSE;
 384     new_node->details->shutdown = FALSE;
 385     new_node->details->rsc_discovery_enabled = TRUE;
 386     new_node->details->running_rsc = NULL;
 387     new_node->details->type = node_ping;
 388 
 389     if (safe_str_eq(type, "remote")) {
 390         new_node->details->type = node_remote;
 391         set_bit(data_set->flags, pe_flag_have_remote_nodes);
 392     } else if (type == NULL || safe_str_eq(type, "member")
 393         || safe_str_eq(type, NORMALNODE)) {
 394         new_node->details->type = node_member;
 395     }
 396 
 397     new_node->details->attrs = crm_str_table_new();
 398 
 399     if (is_remote_node(new_node)) {
 400         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 401                             strdup("remote"));
 402     } else {
 403         g_hash_table_insert(new_node->details->attrs, strdup(CRM_ATTR_KIND),
 404                             strdup("cluster"));
 405     }
 406 
 407     new_node->details->utilization = crm_str_table_new();
 408 
 409     new_node->details->digest_cache =
 410         g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str,
 411                               destroy_digest_cache);
 412 
 413     data_set->nodes = g_list_insert_sorted(data_set->nodes, new_node, sort_node_uname);
 414     return new_node;
 415 }
 416 
 417 bool
 418 remote_id_conflict(const char *remote_name, pe_working_set_t *data) 
     /* [previous][next][first][last][top][bottom][index][help] */
 419 {
 420     bool match = FALSE;
 421 #if 1
 422     pe_find_resource(data->resources, remote_name);
 423 #else
 424     if (data->name_check == NULL) {
 425         data->name_check = g_hash_table_new(crm_str_hash, g_str_equal);
 426         for (xml_rsc = __xml_first_child(parent); xml_rsc != NULL; xml_rsc = __xml_next_element(xml_rsc)) {
 427             const char *id = ID(xml_rsc);
 428 
 429             /* avoiding heap allocation here because we know the duration of this hashtable allows us to */
 430             g_hash_table_insert(data->name_check, (char *) id, (char *) id);
 431         }
 432     }
 433     if (g_hash_table_lookup(data->name_check, remote_name)) {
 434         match = TRUE;
 435     }
 436 #endif
 437     if (match) {
 438         crm_err("Invalid remote-node name, a resource called '%s' already exists.", remote_name);
 439         return NULL;
 440     }
 441 
 442     return match;
 443 }
 444 
 445 
 446 static const char *
 447 expand_remote_rsc_meta(xmlNode *xml_obj, xmlNode *parent, pe_working_set_t *data)
     /* [previous][next][first][last][top][bottom][index][help] */
 448 {
 449     xmlNode *attr_set = NULL;
 450     xmlNode *attr = NULL;
 451 
 452     const char *container_id = ID(xml_obj);
 453     const char *remote_name = NULL;
 454     const char *remote_server = NULL;
 455     const char *remote_port = NULL;
 456     const char *connect_timeout = "60s";
 457     const char *remote_allow_migrate=NULL;
 458     const char *container_managed = NULL;
 459 
 460     for (attr_set = __xml_first_child(xml_obj); attr_set != NULL; attr_set = __xml_next_element(attr_set)) {
 461         if (safe_str_neq((const char *)attr_set->name, XML_TAG_META_SETS)) {
 462             continue;
 463         }
 464 
 465         for (attr = __xml_first_child(attr_set); attr != NULL; attr = __xml_next_element(attr)) {
 466             const char *value = crm_element_value(attr, XML_NVPAIR_ATTR_VALUE);
 467             const char *name = crm_element_value(attr, XML_NVPAIR_ATTR_NAME);
 468 
 469             if (safe_str_eq(name, XML_RSC_ATTR_REMOTE_NODE)) {
 470                 remote_name = value;
 471             } else if (safe_str_eq(name, "remote-addr")) {
 472                 remote_server = value;
 473             } else if (safe_str_eq(name, "remote-port")) {
 474                 remote_port = value;
 475             } else if (safe_str_eq(name, "remote-connect-timeout")) {
 476                 connect_timeout = value;
 477             } else if (safe_str_eq(name, "remote-allow-migrate")) {
 478                 remote_allow_migrate=value;
 479             } else if (safe_str_eq(name, XML_RSC_ATTR_MANAGED)) {
 480                 container_managed = value;
 481             }
 482         }
 483     }
 484 
 485     if (remote_name == NULL) {
 486         return NULL;
 487     }
 488 
 489     if (remote_id_conflict(remote_name, data)) {
 490         return NULL;
 491     }
 492 
 493     pe_create_remote_xml(parent, remote_name, container_id,
 494                          remote_allow_migrate, container_managed, "30s", "30s",
 495                          connect_timeout, remote_server, remote_port);
 496     return remote_name;
 497 }
 498 
 499 static void
 500 handle_startup_fencing(pe_working_set_t *data_set, node_t *new_node)
     /* [previous][next][first][last][top][bottom][index][help] */
 501 {
 502     if ((new_node->details->type == node_remote) && (new_node->details->remote_rsc == NULL)) {
 503         /* Ignore fencing for remote nodes that don't have a connection resource
 504          * associated with them. This happens when remote node entries get left
 505          * in the nodes section after the connection resource is removed.
 506          */
 507         return;
 508     }
 509 
 510     if (is_set(data_set->flags, pe_flag_startup_fencing)) {
 511         // All nodes are unclean until we've seen their status entry
 512         new_node->details->unclean = TRUE;
 513 
 514     } else {
 515         // Blind faith ...
 516         new_node->details->unclean = FALSE;
 517     }
 518 
 519     /* We need to be able to determine if a node's status section
 520      * exists or not separate from whether the node is unclean. */
 521     new_node->details->unseen = TRUE;
 522 }
 523 
 524 gboolean
 525 unpack_nodes(xmlNode * xml_nodes, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 526 {
 527     xmlNode *xml_obj = NULL;
 528     node_t *new_node = NULL;
 529     const char *id = NULL;
 530     const char *uname = NULL;
 531     const char *type = NULL;
 532     const char *score = NULL;
 533 
 534     for (xml_obj = __xml_first_child(xml_nodes); xml_obj != NULL; xml_obj = __xml_next_element(xml_obj)) {
 535         if (crm_str_eq((const char *)xml_obj->name, XML_CIB_TAG_NODE, TRUE)) {
 536             new_node = NULL;
 537 
 538             id = crm_element_value(xml_obj, XML_ATTR_ID);
 539             uname = crm_element_value(xml_obj, XML_ATTR_UNAME);
 540             type = crm_element_value(xml_obj, XML_ATTR_TYPE);
 541             score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
 542             crm_trace("Processing node %s/%s", uname, id);
 543 
 544             if (id == NULL) {
 545                 crm_config_err("Must specify id tag in <node>");
 546                 continue;
 547             }
 548             new_node = pe_create_node(id, uname, type, score, data_set);
 549 
 550             if (new_node == NULL) {
 551                 return FALSE;
 552             }
 553 
 554 /*              if(data_set->have_quorum == FALSE */
 555 /*                 && data_set->no_quorum_policy == no_quorum_stop) { */
 556 /*                      /\* start shutting resources down *\/ */
 557 /*                      new_node->weight = -INFINITY; */
 558 /*              } */
 559 
 560             handle_startup_fencing(data_set, new_node);
 561 
 562             add_node_attrs(xml_obj, new_node, FALSE, data_set);
 563             unpack_instance_attributes(data_set->input, xml_obj, XML_TAG_UTILIZATION, NULL,
 564                                        new_node->details->utilization, NULL, FALSE, data_set->now);
 565 
 566             crm_trace("Done with node %s", crm_element_value(xml_obj, XML_ATTR_UNAME));
 567         }
 568     }
 569 
 570     if (data_set->localhost && pe_find_node(data_set->nodes, data_set->localhost) == NULL) {
 571         crm_info("Creating a fake local node");
 572         pe_create_node(data_set->localhost, data_set->localhost, NULL, 0,
 573                        data_set);
 574     }
 575 
 576     return TRUE;
 577 }
 578 
 579 static void
 580 setup_container(resource_t * rsc, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 581 {
 582     const char *container_id = NULL;
 583 
 584     if (rsc->children) {
 585         GListPtr gIter = rsc->children;
 586 
 587         for (; gIter != NULL; gIter = gIter->next) {
 588             resource_t *child_rsc = (resource_t *) gIter->data;
 589 
 590             setup_container(child_rsc, data_set);
 591         }
 592         return;
 593     }
 594 
 595     container_id = g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_CONTAINER);
 596     if (container_id && safe_str_neq(container_id, rsc->id)) {
 597         resource_t *container = pe_find_resource(data_set->resources, container_id);
 598 
 599         if (container) {
 600             rsc->container = container;
 601             set_bit(container->flags, pe_rsc_is_container);
 602             container->fillers = g_list_append(container->fillers, rsc);
 603             pe_rsc_trace(rsc, "Resource %s's container is %s", rsc->id, container_id);
 604         } else {
 605             pe_err("Resource %s: Unknown resource container (%s)", rsc->id, container_id);
 606         }
 607     }
 608 }
 609 
 610 gboolean
 611 unpack_remote_nodes(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 612 {
 613     xmlNode *xml_obj = NULL;
 614 
 615     /* generate remote nodes from resource config before unpacking resources */
 616     for (xml_obj = __xml_first_child(xml_resources); xml_obj != NULL; xml_obj = __xml_next_element(xml_obj)) {
 617         const char *new_node_id = NULL;
 618 
 619         /* first check if this is a bare metal remote node. Bare metal remote nodes
 620          * are defined as a resource primitive only. */
 621         if (xml_contains_remote_node(xml_obj)) {
 622             new_node_id = ID(xml_obj);
 623             /* The "pe_find_node" check is here to make sure we don't iterate over
 624              * an expanded node that has already been added to the node list. */
 625             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 626                 crm_trace("Found baremetal remote node %s in container resource %s", new_node_id, ID(xml_obj));
 627                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 628                                data_set);
 629             }
 630             continue;
 631         }
 632 
 633         /* Now check for guest remote nodes.
 634          * guest remote nodes are defined within a resource primitive.
 635          * Example1: a vm resource might be configured as a remote node.
 636          * Example2: a vm resource might be configured within a group to be a remote node.
 637          * Note: right now we only support guest remote nodes in as a standalone primitive
 638          * or a primitive within a group. No cloned primitives can be a guest remote node
 639          * right now */
 640         if (crm_str_eq((const char *)xml_obj->name, XML_CIB_TAG_RESOURCE, TRUE)) {
 641             /* expands a metadata defined remote resource into the xml config
 642              * as an actual rsc primitive to be unpacked later. */
 643             new_node_id = expand_remote_rsc_meta(xml_obj, xml_resources, data_set);
 644 
 645             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 646                 crm_trace("Found guest remote node %s in container resource %s", new_node_id, ID(xml_obj));
 647                 pe_create_node(new_node_id, new_node_id, "remote", NULL,
 648                                data_set);
 649             }
 650             continue;
 651 
 652         } else if (crm_str_eq((const char *)xml_obj->name, XML_CIB_TAG_GROUP, TRUE)) {
 653             xmlNode *xml_obj2 = NULL;
 654             /* search through a group to see if any of the primitive contain a remote node. */
 655             for (xml_obj2 = __xml_first_child(xml_obj); xml_obj2 != NULL; xml_obj2 = __xml_next_element(xml_obj2)) {
 656 
 657                 new_node_id = expand_remote_rsc_meta(xml_obj2, xml_resources, data_set);
 658 
 659                 if (new_node_id && pe_find_node(data_set->nodes, new_node_id) == NULL) {
 660                     crm_trace("Found guest remote node %s in container resource %s which is in group %s", new_node_id, ID(xml_obj2), ID(xml_obj));
 661                     pe_create_node(new_node_id, new_node_id, "remote", NULL,
 662                                    data_set);
 663                 }
 664             }
 665         }
 666     }
 667     return TRUE;
 668 }
 669 
 670 
 671 /* Call this after all the nodes and resources have been
 672  * unpacked, but before the status section is read.
 673  *
 674  * A remote node's online status is reflected by the state
 675  * of the remote node's connection resource. We need to link
 676  * the remote node to this connection resource so we can have
 677  * easy access to the connection resource during the PE calculations.
 678  */
 679 static void
 680 link_rsc2remotenode(pe_working_set_t *data_set, resource_t *new_rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
 681 {
 682     node_t *remote_node = NULL;
 683 
 684     if (new_rsc->is_remote_node == FALSE) {
 685         return;
 686     }
 687 
 688     if (is_set(data_set->flags, pe_flag_quick_location)) {
 689         /* remote_nodes and remote_resources are not linked in quick location calculations */
 690         return;
 691     }
 692 
 693     print_resource(LOG_DEBUG_3, "Linking remote-node connection resource, ", new_rsc, FALSE);
 694 
 695     remote_node = pe_find_node(data_set->nodes, new_rsc->id);
 696     CRM_CHECK(remote_node != NULL, return;);
 697 
 698     remote_node->details->remote_rsc = new_rsc;
 699     /* If this is a baremetal remote-node (no container resource
 700      * associated with it) then we need to handle startup fencing the same way
 701      * as cluster nodes. */
 702     if (new_rsc->container == NULL) {
 703         handle_startup_fencing(data_set, remote_node);
 704     } else {
 705         /* At this point we know if the remote node is a container or baremetal
 706          * remote node, update the #kind attribute if a container is involved */
 707         g_hash_table_replace(remote_node->details->attrs, strdup(CRM_ATTR_KIND),
 708                              strdup("container"));
 709     }
 710 }
 711 
 712 static void
 713 destroy_tag(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 714 {
 715     tag_t *tag = data;
 716 
 717     if (tag) {
 718         free(tag->id);
 719         g_list_free_full(tag->refs, free);
 720         free(tag);
 721     }
 722 }
 723 
 724 /*!
 725  * \internal
 726  * \brief Parse configuration XML for resource information
 727  *
 728  * \param[in]     xml_resources  Top of resource configuration XML
 729  * \param[in,out] data_set       Where to put resource information
 730  *
 731  * \return TRUE
 732  *
 733  * \note unpack_remote_nodes() MUST be called before this, so that the nodes can
 734  *       be used when common_unpack() calls resource_location()
 735  */
 736 gboolean
 737 unpack_resources(xmlNode * xml_resources, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 738 {
 739     xmlNode *xml_obj = NULL;
 740     GListPtr gIter = NULL;
 741 
 742     data_set->template_rsc_sets =
 743         g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str,
 744                               destroy_tag);
 745 
 746     for (xml_obj = __xml_first_child(xml_resources); xml_obj != NULL; xml_obj = __xml_next_element(xml_obj)) {
 747         resource_t *new_rsc = NULL;
 748 
 749         if (crm_str_eq((const char *)xml_obj->name, XML_CIB_TAG_RSC_TEMPLATE, TRUE)) {
 750             const char *template_id = ID(xml_obj);
 751 
 752             if (template_id && g_hash_table_lookup_extended(data_set->template_rsc_sets,
 753                                                             template_id, NULL, NULL) == FALSE) {
 754                 /* Record the template's ID for the knowledge of its existence anyway. */
 755                 g_hash_table_insert(data_set->template_rsc_sets, strdup(template_id), NULL);
 756             }
 757             continue;
 758         }
 759 
 760         crm_trace("Beginning unpack... <%s id=%s... >", crm_element_name(xml_obj), ID(xml_obj));
 761         if (common_unpack(xml_obj, &new_rsc, NULL, data_set)) {
 762             data_set->resources = g_list_append(data_set->resources, new_rsc);
 763             print_resource(LOG_DEBUG_3, "Added ", new_rsc, FALSE);
 764 
 765         } else {
 766             crm_config_err("Failed unpacking %s %s",
 767                            crm_element_name(xml_obj), crm_element_value(xml_obj, XML_ATTR_ID));
 768             if (new_rsc != NULL && new_rsc->fns != NULL) {
 769                 new_rsc->fns->free(new_rsc);
 770             }
 771         }
 772     }
 773 
 774     for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) {
 775         resource_t *rsc = (resource_t *) gIter->data;
 776 
 777         setup_container(rsc, data_set);
 778         link_rsc2remotenode(data_set, rsc);
 779     }
 780 
 781     data_set->resources = g_list_sort(data_set->resources, sort_rsc_priority);
 782     if (is_set(data_set->flags, pe_flag_quick_location)) {
 783         /* Ignore */
 784 
 785     } else if (is_set(data_set->flags, pe_flag_stonith_enabled)
 786                && is_set(data_set->flags, pe_flag_have_stonith_resource) == FALSE) {
 787 
 788         crm_config_err("Resource start-up disabled since no STONITH resources have been defined");
 789         crm_config_err("Either configure some or disable STONITH with the stonith-enabled option");
 790         crm_config_err("NOTE: Clusters with shared data need STONITH to ensure data integrity");
 791     }
 792 
 793     return TRUE;
 794 }
 795 
 796 gboolean
 797 unpack_tags(xmlNode * xml_tags, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 798 {
 799     xmlNode *xml_tag = NULL;
 800 
 801     data_set->tags =
 802         g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, destroy_tag);
 803 
 804     for (xml_tag = __xml_first_child(xml_tags); xml_tag != NULL; xml_tag = __xml_next_element(xml_tag)) {
 805         xmlNode *xml_obj_ref = NULL;
 806         const char *tag_id = ID(xml_tag);
 807 
 808         if (crm_str_eq((const char *)xml_tag->name, XML_CIB_TAG_TAG, TRUE) == FALSE) {
 809             continue;
 810         }
 811 
 812         if (tag_id == NULL) {
 813             crm_config_err("Failed unpacking %s: %s should be specified",
 814                            crm_element_name(xml_tag), XML_ATTR_ID);
 815             continue;
 816         }
 817 
 818         for (xml_obj_ref = __xml_first_child(xml_tag); xml_obj_ref != NULL; xml_obj_ref = __xml_next_element(xml_obj_ref)) {
 819             const char *obj_ref = ID(xml_obj_ref);
 820 
 821             if (crm_str_eq((const char *)xml_obj_ref->name, XML_CIB_TAG_OBJ_REF, TRUE) == FALSE) {
 822                 continue;
 823             }
 824 
 825             if (obj_ref == NULL) {
 826                 crm_config_err("Failed unpacking %s for tag %s: %s should be specified",
 827                                crm_element_name(xml_obj_ref), tag_id, XML_ATTR_ID);
 828                 continue;
 829             }
 830 
 831             if (add_tag_ref(data_set->tags, tag_id, obj_ref) == FALSE) {
 832                 return FALSE;
 833             }
 834         }
 835     }
 836 
 837     return TRUE;
 838 }
 839 
 840 /* The ticket state section:
 841  * "/cib/status/tickets/ticket_state" */
 842 static gboolean
 843 unpack_ticket_state(xmlNode * xml_ticket, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 844 {
 845     const char *ticket_id = NULL;
 846     const char *granted = NULL;
 847     const char *last_granted = NULL;
 848     const char *standby = NULL;
 849     xmlAttrPtr xIter = NULL;
 850 
 851     ticket_t *ticket = NULL;
 852 
 853     ticket_id = ID(xml_ticket);
 854     if (ticket_id == NULL || strlen(ticket_id) == 0) {
 855         return FALSE;
 856     }
 857 
 858     crm_trace("Processing ticket state for %s", ticket_id);
 859 
 860     ticket = g_hash_table_lookup(data_set->tickets, ticket_id);
 861     if (ticket == NULL) {
 862         ticket = ticket_new(ticket_id, data_set);
 863         if (ticket == NULL) {
 864             return FALSE;
 865         }
 866     }
 867 
 868     for (xIter = xml_ticket->properties; xIter; xIter = xIter->next) {
 869         const char *prop_name = (const char *)xIter->name;
 870         const char *prop_value = crm_element_value(xml_ticket, prop_name);
 871 
 872         if (crm_str_eq(prop_name, XML_ATTR_ID, TRUE)) {
 873             continue;
 874         }
 875         g_hash_table_replace(ticket->state, strdup(prop_name), strdup(prop_value));
 876     }
 877 
 878     granted = g_hash_table_lookup(ticket->state, "granted");
 879     if (granted && crm_is_true(granted)) {
 880         ticket->granted = TRUE;
 881         crm_info("We have ticket '%s'", ticket->id);
 882     } else {
 883         ticket->granted = FALSE;
 884         crm_info("We do not have ticket '%s'", ticket->id);
 885     }
 886 
 887     last_granted = g_hash_table_lookup(ticket->state, "last-granted");
 888     if (last_granted) {
 889         ticket->last_granted = crm_parse_int(last_granted, 0);
 890     }
 891 
 892     standby = g_hash_table_lookup(ticket->state, "standby");
 893     if (standby && crm_is_true(standby)) {
 894         ticket->standby = TRUE;
 895         if (ticket->granted) {
 896             crm_info("Granted ticket '%s' is in standby-mode", ticket->id);
 897         }
 898     } else {
 899         ticket->standby = FALSE;
 900     }
 901 
 902     crm_trace("Done with ticket state for %s", ticket_id);
 903 
 904     return TRUE;
 905 }
 906 
 907 static gboolean
 908 unpack_tickets_state(xmlNode * xml_tickets, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 909 {
 910     xmlNode *xml_obj = NULL;
 911 
 912     for (xml_obj = __xml_first_child(xml_tickets); xml_obj != NULL; xml_obj = __xml_next_element(xml_obj)) {
 913         if (crm_str_eq((const char *)xml_obj->name, XML_CIB_TAG_TICKET_STATE, TRUE) == FALSE) {
 914             continue;
 915         }
 916         unpack_ticket_state(xml_obj, data_set);
 917     }
 918 
 919     return TRUE;
 920 }
 921 
 922 /* @COMPAT DC < 1.1.7: Compatibility with the deprecated ticket state section:
 923  * "/cib/status/tickets/instance_attributes" */
 924 static void
 925 get_ticket_state_legacy(gpointer key, gpointer value, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
 926 {
 927     const char *long_key = key;
 928     char *state_key = NULL;
 929 
 930     const char *granted_prefix = "granted-ticket-";
 931     const char *last_granted_prefix = "last-granted-";
 932     static int granted_prefix_strlen = 0;
 933     static int last_granted_prefix_strlen = 0;
 934 
 935     const char *ticket_id = NULL;
 936     const char *is_granted = NULL;
 937     const char *last_granted = NULL;
 938     const char *sep = NULL;
 939 
 940     ticket_t *ticket = NULL;
 941     pe_working_set_t *data_set = user_data;
 942 
 943     if (granted_prefix_strlen == 0) {
 944         granted_prefix_strlen = strlen(granted_prefix);
 945     }
 946 
 947     if (last_granted_prefix_strlen == 0) {
 948         last_granted_prefix_strlen = strlen(last_granted_prefix);
 949     }
 950 
 951     if (strstr(long_key, granted_prefix) == long_key) {
 952         ticket_id = long_key + granted_prefix_strlen;
 953         if (strlen(ticket_id)) {
 954             state_key = strdup("granted");
 955             is_granted = value;
 956         }
 957     } else if (strstr(long_key, last_granted_prefix) == long_key) {
 958         ticket_id = long_key + last_granted_prefix_strlen;
 959         if (strlen(ticket_id)) {
 960             state_key = strdup("last-granted");
 961             last_granted = value;
 962         }
 963     } else if ((sep = strrchr(long_key, '-'))) {
 964         ticket_id = sep + 1;
 965         state_key = strndup(long_key, strlen(long_key) - strlen(sep));
 966     }
 967 
 968     if (ticket_id == NULL || strlen(ticket_id) == 0) {
 969         free(state_key);
 970         return;
 971     }
 972 
 973     if (state_key == NULL || strlen(state_key) == 0) {
 974         free(state_key);
 975         return;
 976     }
 977 
 978     ticket = g_hash_table_lookup(data_set->tickets, ticket_id);
 979     if (ticket == NULL) {
 980         ticket = ticket_new(ticket_id, data_set);
 981         if (ticket == NULL) {
 982             free(state_key);
 983             return;
 984         }
 985     }
 986 
 987     g_hash_table_replace(ticket->state, state_key, strdup(value));
 988 
 989     if (is_granted) {
 990         if (crm_is_true(is_granted)) {
 991             ticket->granted = TRUE;
 992             crm_info("We have ticket '%s'", ticket->id);
 993         } else {
 994             ticket->granted = FALSE;
 995             crm_info("We do not have ticket '%s'", ticket->id);
 996         }
 997 
 998     } else if (last_granted) {
 999         ticket->last_granted = crm_parse_int(last_granted, 0);
1000     }
1001 }
1002 
1003 static void
1004 unpack_handle_remote_attrs(node_t *this_node, xmlNode *state, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
1005 {
1006     const char *resource_discovery_enabled = NULL;
1007     xmlNode *attrs = NULL;
1008     resource_t *rsc = NULL;
1009     const char *shutdown = NULL;
1010 
1011     if (crm_str_eq((const char *)state->name, XML_CIB_TAG_STATE, TRUE) == FALSE) {
1012         return;
1013     }
1014 
1015     if ((this_node == NULL) || (is_remote_node(this_node) == FALSE)) {
1016         return;
1017     }
1018     crm_trace("Processing remote node id=%s, uname=%s", this_node->details->id, this_node->details->uname);
1019 
1020     this_node->details->remote_maintenance =
1021         crm_atoi(crm_element_value(state, XML_NODE_IS_MAINTENANCE), "0");
1022 
1023     rsc = this_node->details->remote_rsc;
1024     if (this_node->details->remote_requires_reset == FALSE) {
1025         this_node->details->unclean = FALSE;
1026         this_node->details->unseen = FALSE;
1027     }
1028     attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
1029     add_node_attrs(attrs, this_node, TRUE, data_set);
1030 
1031     shutdown = pe_node_attribute_raw(this_node, XML_CIB_ATTR_SHUTDOWN);
1032     if (shutdown != NULL && safe_str_neq("0", shutdown)) {
1033         crm_info("Node %s is shutting down", this_node->details->uname);
1034         this_node->details->shutdown = TRUE;
1035         if (rsc) {
1036             rsc->next_role = RSC_ROLE_STOPPED;
1037         }
1038     }
1039  
1040     if (crm_is_true(pe_node_attribute_raw(this_node, "standby"))) {
1041         crm_info("Node %s is in standby-mode", this_node->details->uname);
1042         this_node->details->standby = TRUE;
1043     }
1044 
1045     if (crm_is_true(pe_node_attribute_raw(this_node, "maintenance")) ||
1046         (rsc && !is_set(rsc->flags, pe_rsc_managed))) {
1047         crm_info("Node %s is in maintenance-mode", this_node->details->uname);
1048         this_node->details->maintenance = TRUE;
1049     }
1050 
1051     resource_discovery_enabled = pe_node_attribute_raw(this_node, XML_NODE_ATTR_RSC_DISCOVERY);
1052     if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
1053         if (is_baremetal_remote_node(this_node) && is_not_set(data_set->flags, pe_flag_stonith_enabled)) {
1054             crm_warn("ignoring %s attribute on baremetal remote node %s, disabling resource discovery requires stonith to be enabled.",
1055                      XML_NODE_ATTR_RSC_DISCOVERY, this_node->details->uname);
1056         } else {
1057             /* if we're here, this is either a baremetal node and fencing is enabled,
1058              * or this is a container node which we don't care if fencing is enabled 
1059              * or not on. container nodes are 'fenced' by recovering the container resource
1060              * regardless of whether fencing is enabled. */
1061             crm_info("Node %s has resource discovery disabled", this_node->details->uname);
1062             this_node->details->rsc_discovery_enabled = FALSE;
1063         }
1064     }
1065 }
1066 
1067 static bool
1068 unpack_node_loop(xmlNode * status, bool fence, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
1069 {
1070     bool changed = false;
1071     xmlNode *lrm_rsc = NULL;
1072 
1073     for (xmlNode *state = __xml_first_child(status); state != NULL; state = __xml_next_element(state)) {
1074         const char *id = NULL;
1075         const char *uname = NULL;
1076         node_t *this_node = NULL;
1077         bool process = FALSE;
1078 
1079         if (crm_str_eq((const char *)state->name, XML_CIB_TAG_STATE, TRUE) == FALSE) {
1080             continue;
1081         }
1082 
1083         id = crm_element_value(state, XML_ATTR_ID);
1084         uname = crm_element_value(state, XML_ATTR_UNAME);
1085         this_node = pe_find_node_any(data_set->nodes, id, uname);
1086 
1087         if (this_node == NULL) {
1088             crm_info("Node %s is unknown", id);
1089             continue;
1090 
1091         } else if (this_node->details->unpacked) {
1092             crm_info("Node %s is already processed", id);
1093             continue;
1094 
1095         } else if (is_remote_node(this_node) == FALSE && is_set(data_set->flags, pe_flag_stonith_enabled)) {
1096             // A redundant test, but preserves the order for regression tests
1097             process = TRUE;
1098 
1099         } else if (is_remote_node(this_node)) {
1100             bool check = FALSE;
1101             resource_t *rsc = this_node->details->remote_rsc;
1102 
1103             if(fence) {
1104                 check = TRUE;
1105 
1106             } else if(rsc == NULL) {
1107                 /* Not ready yet */
1108 
1109             } else if (is_container_remote_node(this_node)
1110                        && rsc->role == RSC_ROLE_STARTED
1111                        && rsc->container->role == RSC_ROLE_STARTED) {
1112                 /* Both the connection and the underlying container
1113                  * need to be known 'up' before we volunterily process
1114                  * resources inside it
1115                  */
1116                 check = TRUE;
1117                 crm_trace("Checking node %s/%s/%s status %d/%d/%d", id, rsc->id, rsc->container->id, fence, rsc->role, RSC_ROLE_STARTED);
1118 
1119             } else if (is_container_remote_node(this_node) == FALSE
1120                        && rsc->role == RSC_ROLE_STARTED) {
1121                 check = TRUE;
1122                 crm_trace("Checking node %s/%s status %d/%d/%d", id, rsc->id, fence, rsc->role, RSC_ROLE_STARTED);
1123             }
1124 
1125             if (check) {
1126                 determine_remote_online_status(data_set, this_node);
1127                 unpack_handle_remote_attrs(this_node, state, data_set);
1128                 process = TRUE;
1129             }
1130 
1131         } else if (this_node->details->online) {
1132             process = TRUE;
1133 
1134         } else if (fence) {
1135             process = TRUE;
1136         }
1137 
1138         if(process) {
1139             crm_trace("Processing lrm resource entries on %shealthy%s node: %s",
1140                       fence?"un":"", is_remote_node(this_node)?" remote":"",
1141                       this_node->details->uname);
1142             changed = TRUE;
1143             this_node->details->unpacked = TRUE;
1144 
1145             lrm_rsc = find_xml_node(state, XML_CIB_TAG_LRM, FALSE);
1146             lrm_rsc = find_xml_node(lrm_rsc, XML_LRM_TAG_RESOURCES, FALSE);
1147             unpack_lrm_resources(this_node, lrm_rsc, data_set);
1148         }
1149     }
1150     return changed;
1151 }
1152 
1153 /* remove nodes that are down, stopping */
1154 /* create +ve rsc_to_node constraints between resources and the nodes they are running on */
1155 /* anything else? */
1156 gboolean
1157 unpack_status(xmlNode * status, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1158 {
1159     const char *id = NULL;
1160     const char *uname = NULL;
1161 
1162     xmlNode *state = NULL;
1163     node_t *this_node = NULL;
1164 
1165     crm_trace("Beginning unpack");
1166 
1167     if (data_set->tickets == NULL) {
1168         data_set->tickets =
1169             g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, destroy_ticket);
1170     }
1171 
1172     for (state = __xml_first_child(status); state != NULL; state = __xml_next_element(state)) {
1173         if (crm_str_eq((const char *)state->name, XML_CIB_TAG_TICKETS, TRUE)) {
1174             xmlNode *xml_tickets = state;
1175             GHashTable *state_hash = NULL;
1176 
1177             /* @COMPAT DC < 1.1.7: Compatibility with the deprecated ticket state section:
1178              * Unpack the attributes in the deprecated "/cib/status/tickets/instance_attributes" if it exists. */
1179             state_hash = crm_str_table_new();
1180 
1181             unpack_instance_attributes(data_set->input, xml_tickets, XML_TAG_ATTR_SETS, NULL,
1182                                        state_hash, NULL, TRUE, data_set->now);
1183 
1184             g_hash_table_foreach(state_hash, get_ticket_state_legacy, data_set);
1185 
1186             if (state_hash) {
1187                 g_hash_table_destroy(state_hash);
1188             }
1189 
1190             /* Unpack the new "/cib/status/tickets/ticket_state"s */
1191             unpack_tickets_state(xml_tickets, data_set);
1192         }
1193 
1194         if (crm_str_eq((const char *)state->name, XML_CIB_TAG_STATE, TRUE)) {
1195             xmlNode *attrs = NULL;
1196             const char *resource_discovery_enabled = NULL;
1197 
1198             id = crm_element_value(state, XML_ATTR_ID);
1199             uname = crm_element_value(state, XML_ATTR_UNAME);
1200             this_node = pe_find_node_any(data_set->nodes, id, uname);
1201 
1202             if (uname == NULL) {
1203                 /* error */
1204                 continue;
1205 
1206             } else if (this_node == NULL) {
1207                 crm_config_warn("Node %s in status section no longer exists", uname);
1208                 continue;
1209 
1210             } else if (is_remote_node(this_node)) {
1211                 /* online state for remote nodes is determined by the
1212                  * rsc state after all the unpacking is done. we do however
1213                  * need to mark whether or not the node has been fenced as this plays
1214                  * a role during unpacking cluster node resource state */
1215                 this_node->details->remote_was_fenced = 
1216                     crm_atoi(crm_element_value(state, XML_NODE_IS_FENCED), "0");
1217                 continue;
1218             }
1219 
1220             crm_trace("Processing node id=%s, uname=%s", id, uname);
1221 
1222             /* Mark the node as provisionally clean
1223              * - at least we have seen it in the current cluster's lifetime
1224              */
1225             this_node->details->unclean = FALSE;
1226             this_node->details->unseen = FALSE;
1227             attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
1228             add_node_attrs(attrs, this_node, TRUE, data_set);
1229 
1230             if (crm_is_true(pe_node_attribute_raw(this_node, "standby"))) {
1231                 crm_info("Node %s is in standby-mode", this_node->details->uname);
1232                 this_node->details->standby = TRUE;
1233             }
1234 
1235             if (crm_is_true(pe_node_attribute_raw(this_node, "maintenance"))) {
1236                 crm_info("Node %s is in maintenance-mode", this_node->details->uname);
1237                 this_node->details->maintenance = TRUE;
1238             }
1239 
1240             resource_discovery_enabled = pe_node_attribute_raw(this_node, XML_NODE_ATTR_RSC_DISCOVERY);
1241             if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
1242                 crm_warn("ignoring %s attribute on node %s, disabling resource discovery is not allowed on cluster nodes",
1243                     XML_NODE_ATTR_RSC_DISCOVERY, this_node->details->uname);
1244             }
1245 
1246             crm_trace("determining node state");
1247             determine_online_status(state, this_node, data_set);
1248 
1249             if (is_not_set(data_set->flags, pe_flag_have_quorum)
1250                 && this_node->details->online
1251                 && (data_set->no_quorum_policy == no_quorum_suicide)) {
1252                 /* Everything else should flow from this automatically
1253                  * At least until the PE becomes able to migrate off healthy resources
1254                  */
1255                 pe_fence_node(data_set, this_node, "cluster does not have quorum");
1256             }
1257         }
1258     }
1259 
1260 
1261     while(unpack_node_loop(status, FALSE, data_set)) {
1262         crm_trace("Start another loop");
1263     }
1264 
1265     // Now catch any nodes we didn't see
1266     unpack_node_loop(status, is_set(data_set->flags, pe_flag_stonith_enabled), data_set);
1267 
1268     for (GListPtr gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) {
1269         node_t *this_node = gIter->data;
1270 
1271         if (this_node == NULL) {
1272             continue;
1273         } else if(is_remote_node(this_node) == FALSE) {
1274             continue;
1275         } else if(this_node->details->unpacked) {
1276             continue;
1277         }
1278         determine_remote_online_status(data_set, this_node);
1279     }
1280 
1281     return TRUE;
1282 }
1283 
1284 static gboolean
1285 determine_online_status_no_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1286                                    node_t * this_node)
1287 {
1288     gboolean online = FALSE;
1289     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1290     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1291     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1292     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1293 
1294     if (!crm_is_true(in_cluster)) {
1295         crm_trace("Node is down: in_cluster=%s", crm_str(in_cluster));
1296 
1297     } else if (safe_str_eq(is_peer, ONLINESTATUS)) {
1298         if (safe_str_eq(join, CRMD_JOINSTATE_MEMBER)) {
1299             online = TRUE;
1300         } else {
1301             crm_debug("Node is not ready to run resources: %s", join);
1302         }
1303 
1304     } else if (this_node->details->expected_up == FALSE) {
1305         crm_trace("CRMd is down: in_cluster=%s", crm_str(in_cluster));
1306         crm_trace("\tis_peer=%s, join=%s, expected=%s",
1307                   crm_str(is_peer), crm_str(join), crm_str(exp_state));
1308 
1309     } else {
1310         /* mark it unclean */
1311         pe_fence_node(data_set, this_node, "peer is unexpectedly down");
1312         crm_info("\tin_cluster=%s, is_peer=%s, join=%s, expected=%s",
1313                  crm_str(in_cluster), crm_str(is_peer), crm_str(join), crm_str(exp_state));
1314     }
1315     return online;
1316 }
1317 
1318 static gboolean
1319 determine_online_status_fencing(pe_working_set_t * data_set, xmlNode * node_state,
     /* [previous][next][first][last][top][bottom][index][help] */
1320                                 node_t * this_node)
1321 {
1322     gboolean online = FALSE;
1323     gboolean do_terminate = FALSE;
1324     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
1325     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
1326     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
1327     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1328     const char *terminate = pe_node_attribute_raw(this_node, "terminate");
1329 
1330 /*
1331   - XML_NODE_IN_CLUSTER    ::= true|false
1332   - XML_NODE_IS_PEER       ::= true|false|online|offline
1333   - XML_NODE_JOIN_STATE    ::= member|down|pending|banned
1334   - XML_NODE_EXPECTED      ::= member|down
1335 */
1336 
1337     if (crm_is_true(terminate)) {
1338         do_terminate = TRUE;
1339 
1340     } else if (terminate != NULL && strlen(terminate) > 0) {
1341         /* could be a time() value */
1342         char t = terminate[0];
1343 
1344         if (t != '0' && isdigit(t)) {
1345             do_terminate = TRUE;
1346         }
1347     }
1348 
1349     crm_trace("%s: in_cluster=%s, is_peer=%s, join=%s, expected=%s, term=%d",
1350               this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
1351               crm_str(join), crm_str(exp_state), do_terminate);
1352 
1353     online = crm_is_true(in_cluster);
1354     if (safe_str_eq(is_peer, ONLINESTATUS)) {
1355         is_peer = XML_BOOLEAN_YES;
1356     }
1357     if (exp_state == NULL) {
1358         exp_state = CRMD_JOINSTATE_DOWN;
1359     }
1360 
1361     if (this_node->details->shutdown) {
1362         crm_debug("%s is shutting down", this_node->details->uname);
1363 
1364         /* Slightly different criteria since we can't shut down a dead peer */
1365         online = crm_is_true(is_peer);
1366 
1367     } else if (in_cluster == NULL) {
1368         pe_fence_node(data_set, this_node, "peer has not been seen by the cluster");
1369 
1370     } else if (safe_str_eq(join, CRMD_JOINSTATE_NACK)) {
1371         pe_fence_node(data_set, this_node, "peer failed the pacemaker membership criteria");
1372 
1373     } else if (do_terminate == FALSE && safe_str_eq(exp_state, CRMD_JOINSTATE_DOWN)) {
1374 
1375         if (crm_is_true(in_cluster) || crm_is_true(is_peer)) {
1376             crm_info("- Node %s is not ready to run resources", this_node->details->uname);
1377             this_node->details->standby = TRUE;
1378             this_node->details->pending = TRUE;
1379 
1380         } else {
1381             crm_trace("%s is down or still coming up", this_node->details->uname);
1382         }
1383 
1384     } else if (do_terminate && safe_str_eq(join, CRMD_JOINSTATE_DOWN)
1385                && crm_is_true(in_cluster) == FALSE && crm_is_true(is_peer) == FALSE) {
1386         crm_info("Node %s was just shot", this_node->details->uname);
1387         online = FALSE;
1388 
1389     } else if (crm_is_true(in_cluster) == FALSE) {
1390         pe_fence_node(data_set, this_node, "peer is no longer part of the cluster");
1391 
1392     } else if (crm_is_true(is_peer) == FALSE) {
1393         pe_fence_node(data_set, this_node, "peer process is no longer available");
1394 
1395         /* Everything is running at this point, now check join state */
1396     } else if (do_terminate) {
1397         pe_fence_node(data_set, this_node, "termination was requested");
1398 
1399     } else if (safe_str_eq(join, CRMD_JOINSTATE_MEMBER)) {
1400         crm_info("Node %s is active", this_node->details->uname);
1401 
1402     } else if (safe_str_eq(join, CRMD_JOINSTATE_PENDING)
1403                || safe_str_eq(join, CRMD_JOINSTATE_DOWN)) {
1404         crm_info("Node %s is not ready to run resources", this_node->details->uname);
1405         this_node->details->standby = TRUE;
1406         this_node->details->pending = TRUE;
1407 
1408     } else {
1409         pe_fence_node(data_set, this_node, "peer was in an unknown state");
1410         crm_warn("%s: in-cluster=%s, is-peer=%s, join=%s, expected=%s, term=%d, shutdown=%d",
1411                  this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
1412                  crm_str(join), crm_str(exp_state), do_terminate, this_node->details->shutdown);
1413     }
1414 
1415     return online;
1416 }
1417 
1418 static gboolean
1419 determine_remote_online_status(pe_working_set_t * data_set, node_t * this_node)
     /* [previous][next][first][last][top][bottom][index][help] */
1420 {
1421     resource_t *rsc = this_node->details->remote_rsc;
1422     resource_t *container = NULL;
1423     pe_node_t *host = NULL;
1424 
1425     /* If there is a node state entry for a (former) Pacemaker Remote node
1426      * but no resource creating that node, the node's connection resource will
1427      * be NULL. Consider it an offline remote node in that case.
1428      */
1429     if (rsc == NULL) {
1430         this_node->details->online = FALSE;
1431         goto remote_online_done;
1432     }
1433 
1434     container = rsc->container;
1435 
1436     if (container && (g_list_length(rsc->running_on) == 1)) {
1437         host = rsc->running_on->data;
1438     }
1439 
1440     /* If the resource is currently started, mark it online. */
1441     if (rsc->role == RSC_ROLE_STARTED) {
1442         crm_trace("%s node %s presumed ONLINE because connection resource is started",
1443                   (container? "Guest" : "Remote"), this_node->details->id);
1444         this_node->details->online = TRUE;
1445     }
1446 
1447     /* consider this node shutting down if transitioning start->stop */
1448     if (rsc->role == RSC_ROLE_STARTED && rsc->next_role == RSC_ROLE_STOPPED) {
1449         crm_trace("%s node %s shutting down because connection resource is stopping",
1450                   (container? "Guest" : "Remote"), this_node->details->id);
1451         this_node->details->shutdown = TRUE;
1452     }
1453 
1454     /* Now check all the failure conditions. */
1455     if(container && is_set(container->flags, pe_rsc_failed)) {
1456         crm_trace("Guest node %s UNCLEAN because guest resource failed",
1457                   this_node->details->id);
1458         this_node->details->online = FALSE;
1459         this_node->details->remote_requires_reset = TRUE;
1460 
1461     } else if(is_set(rsc->flags, pe_rsc_failed)) {
1462         crm_trace("%s node %s OFFLINE because connection resource failed",
1463                   (container? "Guest" : "Remote"), this_node->details->id);
1464         this_node->details->online = FALSE;
1465 
1466     } else if (rsc->role == RSC_ROLE_STOPPED
1467         || (container && container->role == RSC_ROLE_STOPPED)) {
1468 
1469         crm_trace("%s node %s OFFLINE because its resource is stopped",
1470                   (container? "Guest" : "Remote"), this_node->details->id);
1471         this_node->details->online = FALSE;
1472         this_node->details->remote_requires_reset = FALSE;
1473 
1474     } else if (host && (host->details->online == FALSE)
1475                && host->details->unclean) {
1476         crm_trace("Guest node %s UNCLEAN because host is unclean",
1477                   this_node->details->id);
1478         this_node->details->online = FALSE;
1479         this_node->details->remote_requires_reset = TRUE;
1480     }
1481 
1482 remote_online_done:
1483     crm_trace("Remote node %s online=%s",
1484         this_node->details->id, this_node->details->online ? "TRUE" : "FALSE");
1485     return this_node->details->online;
1486 }
1487 
1488 gboolean
1489 determine_online_status(xmlNode * node_state, node_t * this_node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1490 {
1491     gboolean online = FALSE;
1492     const char *shutdown = NULL;
1493     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
1494 
1495     if (this_node == NULL) {
1496         crm_config_err("No node to check");
1497         return online;
1498     }
1499 
1500     this_node->details->shutdown = FALSE;
1501     this_node->details->expected_up = FALSE;
1502     shutdown = pe_node_attribute_raw(this_node, XML_CIB_ATTR_SHUTDOWN);
1503 
1504     if (shutdown != NULL && safe_str_neq("0", shutdown)) {
1505         this_node->details->shutdown = TRUE;
1506 
1507     } else if (safe_str_eq(exp_state, CRMD_JOINSTATE_MEMBER)) {
1508         this_node->details->expected_up = TRUE;
1509     }
1510 
1511     if (this_node->details->type == node_ping) {
1512         this_node->details->unclean = FALSE;
1513         online = FALSE;         /* As far as resource management is concerned,
1514                                  * the node is safely offline.
1515                                  * Anyone caught abusing this logic will be shot
1516                                  */
1517 
1518     } else if (is_set(data_set->flags, pe_flag_stonith_enabled) == FALSE) {
1519         online = determine_online_status_no_fencing(data_set, node_state, this_node);
1520 
1521     } else {
1522         online = determine_online_status_fencing(data_set, node_state, this_node);
1523     }
1524 
1525     if (online) {
1526         this_node->details->online = TRUE;
1527 
1528     } else {
1529         /* remove node from contention */
1530         this_node->fixed = TRUE;
1531         this_node->weight = -INFINITY;
1532     }
1533 
1534     if (online && this_node->details->shutdown) {
1535         /* don't run resources here */
1536         this_node->fixed = TRUE;
1537         this_node->weight = -INFINITY;
1538     }
1539 
1540     if (this_node->details->type == node_ping) {
1541         crm_info("Node %s is not a pacemaker node", this_node->details->uname);
1542 
1543     } else if (this_node->details->unclean) {
1544         pe_proc_warn("Node %s is unclean", this_node->details->uname);
1545 
1546     } else if (this_node->details->online) {
1547         crm_info("Node %s is %s", this_node->details->uname,
1548                  this_node->details->shutdown ? "shutting down" :
1549                  this_node->details->pending ? "pending" :
1550                  this_node->details->standby ? "standby" :
1551                  this_node->details->maintenance ? "maintenance" : "online");
1552 
1553     } else {
1554         crm_trace("Node %s is offline", this_node->details->uname);
1555     }
1556 
1557     return online;
1558 }
1559 
1560 /*!
1561  * \internal
1562  * \brief Find the end of a resource's name, excluding any clone suffix
1563  *
1564  * \param[in] id  Resource ID to check
1565  *
1566  * \return Pointer to last character of resource's base name
1567  */
1568 const char *
1569 pe_base_name_end(const char *id)
     /* [previous][next][first][last][top][bottom][index][help] */
1570 {
1571     if (!crm_strlen_zero(id)) {
1572         const char *end = id + strlen(id) - 1;
1573 
1574         for (const char *s = end; s > id; --s) {
1575             switch (*s) {
1576                 case '0':
1577                 case '1':
1578                 case '2':
1579                 case '3':
1580                 case '4':
1581                 case '5':
1582                 case '6':
1583                 case '7':
1584                 case '8':
1585                 case '9':
1586                     break;
1587                 case ':':
1588                     return (s == end)? s : (s - 1);
1589                 default:
1590                     return end;
1591             }
1592         }
1593         return end;
1594     }
1595     return NULL;
1596 }
1597 
1598 /*!
1599  * \internal
1600  * \brief Get a resource name excluding any clone suffix
1601  *
1602  * \param[in] last_rsc_id  Resource ID to check
1603  *
1604  * \return Pointer to newly allocated string with resource's base name
1605  * \note It is the caller's responsibility to free() the result.
1606  *       This asserts on error, so callers can assume result is not NULL.
1607  */
1608 char *
1609 clone_strip(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1610 {
1611     const char *end = pe_base_name_end(last_rsc_id);
1612     char *basename = NULL;
1613 
1614     CRM_ASSERT(end);
1615     basename = strndup(last_rsc_id, end - last_rsc_id + 1);
1616     CRM_ASSERT(basename);
1617     return basename;
1618 }
1619 
1620 /*!
1621  * \internal
1622  * \brief Get the name of the first instance of a cloned resource
1623  *
1624  * \param[in] last_rsc_id  Resource ID to check
1625  *
1626  * \return Pointer to newly allocated string with resource's base name plus :0
1627  * \note It is the caller's responsibility to free() the result.
1628  *       This asserts on error, so callers can assume result is not NULL.
1629  */
1630 char *
1631 clone_zero(const char *last_rsc_id)
     /* [previous][next][first][last][top][bottom][index][help] */
1632 {
1633     const char *end = pe_base_name_end(last_rsc_id);
1634     size_t base_name_len = end - last_rsc_id + 1;
1635     char *zero = NULL;
1636 
1637     CRM_ASSERT(end);
1638     zero = calloc(base_name_len + 3, sizeof(char));
1639     CRM_ASSERT(zero);
1640     memcpy(zero, last_rsc_id, base_name_len);
1641     zero[base_name_len] = ':';
1642     zero[base_name_len + 1] = '0';
1643     return zero;
1644 }
1645 
1646 static resource_t *
1647 create_fake_resource(const char *rsc_id, xmlNode * rsc_entry, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1648 {
1649     resource_t *rsc = NULL;
1650     xmlNode *xml_rsc = create_xml_node(NULL, XML_CIB_TAG_RESOURCE);
1651 
1652     copy_in_properties(xml_rsc, rsc_entry);
1653     crm_xml_add(xml_rsc, XML_ATTR_ID, rsc_id);
1654     crm_log_xml_debug(xml_rsc, "Orphan resource");
1655 
1656     if (!common_unpack(xml_rsc, &rsc, NULL, data_set)) {
1657         return NULL;
1658     }
1659 
1660     if (xml_contains_remote_node(xml_rsc)) {
1661         node_t *node;
1662 
1663         crm_debug("Detected orphaned remote node %s", rsc_id);
1664         node = pe_find_node(data_set->nodes, rsc_id);
1665         if (node == NULL) {
1666                 node = pe_create_node(rsc_id, rsc_id, "remote", NULL, data_set);
1667         }
1668         link_rsc2remotenode(data_set, rsc);
1669 
1670         if (node) {
1671             crm_trace("Setting node %s as shutting down due to orphaned connection resource", rsc_id);
1672             node->details->shutdown = TRUE;
1673         }
1674     }
1675 
1676     if (crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER)) {
1677         /* This orphaned rsc needs to be mapped to a container. */
1678         crm_trace("Detected orphaned container filler %s", rsc_id);
1679         set_bit(rsc->flags, pe_rsc_orphan_container_filler);
1680     }
1681     set_bit(rsc->flags, pe_rsc_orphan);
1682     data_set->resources = g_list_append(data_set->resources, rsc);
1683     return rsc;
1684 }
1685 
1686 extern resource_t *create_child_clone(resource_t * rsc, int sub_id, pe_working_set_t * data_set);
1687 
1688 static resource_t *
1689 find_anonymous_clone(pe_working_set_t * data_set, node_t * node, resource_t * parent,
     /* [previous][next][first][last][top][bottom][index][help] */
1690                      const char *rsc_id)
1691 {
1692     GListPtr rIter = NULL;
1693     resource_t *rsc = NULL;
1694     gboolean skip_inactive = FALSE;
1695 
1696     CRM_ASSERT(parent != NULL);
1697     CRM_ASSERT(pe_rsc_is_clone(parent));
1698     CRM_ASSERT(is_not_set(parent->flags, pe_rsc_unique));
1699 
1700     /* Find an instance active (or partially active for grouped clones) on the specified node */
1701     pe_rsc_trace(parent, "Looking for %s on %s in %s", rsc_id, node->details->uname, parent->id);
1702     for (rIter = parent->children; rsc == NULL && rIter; rIter = rIter->next) {
1703         GListPtr nIter = NULL;
1704         GListPtr locations = NULL;
1705         resource_t *child = rIter->data;
1706 
1707         child->fns->location(child, &locations, TRUE);
1708         if (locations == NULL) {
1709             pe_rsc_trace(child, "Resource %s, skip inactive", child->id);
1710             continue;
1711         }
1712 
1713         for (nIter = locations; nIter && rsc == NULL; nIter = nIter->next) {
1714             node_t *childnode = nIter->data;
1715 
1716             if (childnode->details == node->details) {
1717                 /* ->find_rsc() because we might be a cloned group */
1718                 rsc = parent->fns->find_rsc(child, rsc_id, NULL, pe_find_clone);
1719                 if(rsc) {
1720                     pe_rsc_trace(rsc, "Resource %s, active", rsc->id);
1721                 }
1722             }
1723 
1724             /* Keep this block, it means we'll do the right thing if
1725              * anyone toggles the unique flag to 'off'
1726              */
1727             if (rsc && rsc->running_on) {
1728                 crm_notice("/Anonymous/ clone %s is already running on %s",
1729                            parent->id, node->details->uname);
1730                 skip_inactive = TRUE;
1731                 rsc = NULL;
1732             }
1733         }
1734 
1735         g_list_free(locations);
1736     }
1737 
1738     /* Find an inactive instance */
1739     if (skip_inactive == FALSE) {
1740         pe_rsc_trace(parent, "Looking for %s anywhere", rsc_id);
1741         for (rIter = parent->children; rsc == NULL && rIter; rIter = rIter->next) {
1742             GListPtr locations = NULL;
1743             resource_t *child = rIter->data;
1744 
1745             if (is_set(child->flags, pe_rsc_block)) {
1746                 pe_rsc_trace(child, "Skip: blocked in stopped state");
1747                 continue;
1748             }
1749 
1750             child->fns->location(child, &locations, TRUE);
1751             if (locations == NULL) {
1752                 /* ->find_rsc() because we might be a cloned group */
1753                 rsc = parent->fns->find_rsc(child, rsc_id, NULL, pe_find_clone);
1754                 pe_rsc_trace(parent, "Resource %s, empty slot", rsc->id);
1755             }
1756             g_list_free(locations);
1757         }
1758     }
1759 
1760     if (rsc == NULL) {
1761         /* Create an extra orphan */
1762         resource_t *top = create_child_clone(parent, -1, data_set);
1763 
1764         /* ->find_rsc() because we might be a cloned group */
1765         rsc = top->fns->find_rsc(top, rsc_id, NULL, pe_find_clone);
1766         CRM_ASSERT(rsc != NULL);
1767 
1768         pe_rsc_debug(parent, "Created orphan %s for %s: %s on %s", top->id, parent->id, rsc_id,
1769                      node->details->uname);
1770     }
1771 
1772     if (safe_str_neq(rsc_id, rsc->id)) {
1773         pe_rsc_debug(rsc, "Internally renamed %s on %s to %s%s",
1774                     rsc_id, node->details->uname, rsc->id,
1775                     is_set(rsc->flags, pe_rsc_orphan) ? " (ORPHAN)" : "");
1776     }
1777 
1778     return rsc;
1779 }
1780 
1781 static resource_t *
1782 unpack_find_resource(pe_working_set_t * data_set, node_t * node, const char *rsc_id,
     /* [previous][next][first][last][top][bottom][index][help] */
1783                      xmlNode * rsc_entry)
1784 {
1785     resource_t *rsc = NULL;
1786     resource_t *parent = NULL;
1787 
1788     crm_trace("looking for %s", rsc_id);
1789     rsc = pe_find_resource(data_set->resources, rsc_id);
1790 
1791     /* no match */
1792     if (rsc == NULL) {
1793         /* Even when clone-max=0, we still create a single :0 orphan to match against */
1794         char *tmp = clone_zero(rsc_id);
1795         resource_t *clone0 = pe_find_resource(data_set->resources, tmp);
1796 
1797         if (clone0 && is_not_set(clone0->flags, pe_rsc_unique)) {
1798             rsc = clone0;
1799         } else {
1800             crm_trace("%s is not known as %s either", rsc_id, tmp);
1801         }
1802 
1803         parent = uber_parent(clone0);
1804         free(tmp);
1805 
1806         crm_trace("%s not found: %s", rsc_id, parent ? parent->id : "orphan");
1807 
1808     } else if (rsc->variant > pe_native) {
1809         crm_trace("%s is no longer a primitive resource, the lrm_resource entry is obsolete",
1810                   rsc_id);
1811         return NULL;
1812 
1813     } else {
1814         parent = uber_parent(rsc);
1815     }
1816 
1817     if(parent && parent->parent) {
1818         rsc = find_container_child(rsc_id, rsc, node);
1819 
1820     } else if (pe_rsc_is_clone(parent)) {
1821         if (is_not_set(parent->flags, pe_rsc_unique)) {
1822             char *base = clone_strip(rsc_id);
1823 
1824             rsc = find_anonymous_clone(data_set, node, parent, base);
1825             CRM_ASSERT(rsc != NULL);
1826             free(base);
1827         }
1828 
1829         if (rsc && safe_str_neq(rsc_id, rsc->id)) {
1830             free(rsc->clone_name);
1831             rsc->clone_name = strdup(rsc_id);
1832         }
1833     }
1834 
1835     return rsc;
1836 }
1837 
1838 static resource_t *
1839 process_orphan_resource(xmlNode * rsc_entry, node_t * node, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1840 {
1841     resource_t *rsc = NULL;
1842     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
1843 
1844     crm_debug("Detected orphan resource %s on %s", rsc_id, node->details->uname);
1845     rsc = create_fake_resource(rsc_id, rsc_entry, data_set);
1846 
1847     if (is_set(data_set->flags, pe_flag_stop_rsc_orphans) == FALSE) {
1848         clear_bit(rsc->flags, pe_rsc_managed);
1849 
1850     } else {
1851         print_resource(LOG_DEBUG_3, "Added orphan", rsc, FALSE);
1852 
1853         CRM_CHECK(rsc != NULL, return NULL);
1854         resource_location(rsc, NULL, -INFINITY, "__orphan_dont_run__", data_set);
1855     }
1856     return rsc;
1857 }
1858 
1859 static void
1860 process_rsc_state(resource_t * rsc, node_t * node,
     /* [previous][next][first][last][top][bottom][index][help] */
1861                   enum action_fail_response on_fail,
1862                   xmlNode * migrate_op, pe_working_set_t * data_set)
1863 {
1864     node_t *tmpnode = NULL;
1865     char *reason = NULL;
1866 
1867     CRM_ASSERT(rsc);
1868     pe_rsc_trace(rsc, "Resource %s is %s on %s: on_fail=%s",
1869                  rsc->id, role2text(rsc->role), node->details->uname, fail2text(on_fail));
1870 
1871     /* process current state */
1872     if (rsc->role != RSC_ROLE_UNKNOWN) {
1873         resource_t *iter = rsc;
1874 
1875         while (iter) {
1876             if (g_hash_table_lookup(iter->known_on, node->details->id) == NULL) {
1877                 node_t *n = node_copy(node);
1878 
1879                 pe_rsc_trace(rsc, "%s (aka. %s) known on %s", rsc->id, rsc->clone_name,
1880                              n->details->uname);
1881                 g_hash_table_insert(iter->known_on, (gpointer) n->details->id, n);
1882             }
1883             if (is_set(iter->flags, pe_rsc_unique)) {
1884                 break;
1885             }
1886             iter = iter->parent;
1887         }
1888     }
1889 
1890     /* If a managed resource is believed to be running, but node is down ... */
1891     if (rsc->role > RSC_ROLE_STOPPED
1892         && node->details->online == FALSE
1893         && node->details->maintenance == FALSE
1894         && is_set(rsc->flags, pe_rsc_managed)) {
1895 
1896         gboolean should_fence = FALSE;
1897 
1898         /* If this is a guest node, fence it (regardless of whether fencing is
1899          * enabled, because guest node fencing is done by recovery of the
1900          * container resource rather than by stonithd). Mark the resource
1901          * we're processing as failed. When the guest comes back up, its
1902          * operation history in the CIB will be cleared, freeing the affected
1903          * resource to run again once we are sure we know its state.
1904          */
1905         if (is_container_remote_node(node)) {
1906             set_bit(rsc->flags, pe_rsc_failed);
1907             should_fence = TRUE;
1908 
1909         } else if (is_set(data_set->flags, pe_flag_stonith_enabled)) {
1910             if (is_baremetal_remote_node(node) && node->details->remote_rsc
1911                 && is_not_set(node->details->remote_rsc->flags, pe_rsc_failed)) {
1912 
1913                 /* setting unseen = true means that fencing of the remote node will
1914                  * only occur if the connection resource is not going to start somewhere.
1915                  * This allows connection resources on a failed cluster-node to move to
1916                  * another node without requiring the baremetal remote nodes to be fenced
1917                  * as well. */
1918                 node->details->unseen = TRUE;
1919                 reason = crm_strdup_printf("%s is active there (fencing will be"
1920                                            " revoked if remote connection can "
1921                                            "be re-established elsewhere)",
1922                                            rsc->id);
1923             }
1924             should_fence = TRUE;
1925         }
1926 
1927         if (should_fence) {
1928             if (reason == NULL) {
1929                reason = crm_strdup_printf("%s is thought to be active there", rsc->id);
1930             }
1931             pe_fence_node(data_set, node, reason);
1932         }
1933         free(reason);
1934     }
1935 
1936     if (node->details->unclean) {
1937         /* No extra processing needed
1938          * Also allows resources to be started again after a node is shot
1939          */
1940         on_fail = action_fail_ignore;
1941     }
1942 
1943     switch (on_fail) {
1944         case action_fail_ignore:
1945             /* nothing to do */
1946             break;
1947 
1948         case action_fail_fence:
1949             /* treat it as if it is still running
1950              * but also mark the node as unclean
1951              */
1952             reason = crm_strdup_printf("%s failed there", rsc->id);
1953             pe_fence_node(data_set, node, reason);
1954             free(reason);
1955             break;
1956 
1957         case action_fail_standby:
1958             node->details->standby = TRUE;
1959             node->details->standby_onfail = TRUE;
1960             break;
1961 
1962         case action_fail_block:
1963             /* is_managed == FALSE will prevent any
1964              * actions being sent for the resource
1965              */
1966             clear_bit(rsc->flags, pe_rsc_managed);
1967             set_bit(rsc->flags, pe_rsc_block);
1968             break;
1969 
1970         case action_fail_migrate:
1971             /* make sure it comes up somewhere else
1972              * or not at all
1973              */
1974             resource_location(rsc, node, -INFINITY, "__action_migration_auto__", data_set);
1975             break;
1976 
1977         case action_fail_stop:
1978             rsc->next_role = RSC_ROLE_STOPPED;
1979             break;
1980 
1981         case action_fail_recover:
1982             if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
1983                 set_bit(rsc->flags, pe_rsc_failed);
1984                 stop_action(rsc, node, FALSE);
1985             }
1986             break;
1987 
1988         case action_fail_restart_container:
1989             set_bit(rsc->flags, pe_rsc_failed);
1990 
1991             if (rsc->container) {
1992                 stop_action(rsc->container, node, FALSE);
1993             } else if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
1994                 stop_action(rsc, node, FALSE);
1995             }
1996             break;
1997 
1998         case action_fail_reset_remote:
1999             set_bit(rsc->flags, pe_rsc_failed);
2000             if (is_set(data_set->flags, pe_flag_stonith_enabled)) {
2001                 tmpnode = NULL;
2002                 if (rsc->is_remote_node) {
2003                     tmpnode = pe_find_node(data_set->nodes, rsc->id);
2004                 }
2005                 if (tmpnode &&
2006                     is_baremetal_remote_node(tmpnode) &&
2007                     tmpnode->details->remote_was_fenced == 0) {
2008 
2009                     /* connection resource to baremetal resource failed in a way that
2010                      * should result in fencing the remote-node. */
2011                     pe_fence_node(data_set, tmpnode,
2012                                   "remote connection is unrecoverable");
2013                 }
2014             }
2015 
2016             /* require the stop action regardless if fencing is occurring or not. */
2017             if (rsc->role > RSC_ROLE_STOPPED) {
2018                 stop_action(rsc, node, FALSE);
2019             }
2020 
2021             /* if reconnect delay is in use, prevent the connection from exiting the
2022              * "STOPPED" role until the failure is cleared by the delay timeout. */
2023             if (rsc->remote_reconnect_interval) {
2024                 rsc->next_role = RSC_ROLE_STOPPED;
2025             }
2026             break;
2027     }
2028 
2029     /* ensure a remote-node connection failure forces an unclean remote-node
2030      * to be fenced. By setting unseen = FALSE, the remote-node failure will
2031      * result in a fencing operation regardless if we're going to attempt to 
2032      * reconnect to the remote-node in this transition or not. */
2033     if (is_set(rsc->flags, pe_rsc_failed) && rsc->is_remote_node) {
2034         tmpnode = pe_find_node(data_set->nodes, rsc->id);
2035         if (tmpnode && tmpnode->details->unclean) {
2036             tmpnode->details->unseen = FALSE;
2037         }
2038     }
2039 
2040     if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
2041         if (is_set(rsc->flags, pe_rsc_orphan)) {
2042             if (is_set(rsc->flags, pe_rsc_managed)) {
2043                 crm_config_warn("Detected active orphan %s running on %s",
2044                                 rsc->id, node->details->uname);
2045             } else {
2046                 crm_config_warn("Cluster configured not to stop active orphans."
2047                                 " %s must be stopped manually on %s",
2048                                 rsc->id, node->details->uname);
2049             }
2050         }
2051 
2052         native_add_running(rsc, node, data_set);
2053         if (on_fail != action_fail_ignore) {
2054             set_bit(rsc->flags, pe_rsc_failed);
2055         }
2056 
2057     } else if (rsc->clone_name && strchr(rsc->clone_name, ':') != NULL) {
2058         /* Only do this for older status sections that included instance numbers
2059          * Otherwise stopped instances will appear as orphans
2060          */
2061         pe_rsc_trace(rsc, "Resetting clone_name %s for %s (stopped)", rsc->clone_name, rsc->id);
2062         free(rsc->clone_name);
2063         rsc->clone_name = NULL;
2064 
2065     } else {
2066         char *key = stop_key(rsc);
2067         GListPtr possible_matches = find_actions(rsc->actions, key, node);
2068         GListPtr gIter = possible_matches;
2069 
2070         for (; gIter != NULL; gIter = gIter->next) {
2071             action_t *stop = (action_t *) gIter->data;
2072 
2073             stop->flags |= pe_action_optional;
2074         }
2075 
2076         g_list_free(possible_matches);
2077         free(key);
2078     }
2079 }
2080 
2081 /* create active recurring operations as optional */
2082 static void
2083 process_recurring(node_t * node, resource_t * rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
2084                   int start_index, int stop_index,
2085                   GListPtr sorted_op_list, pe_working_set_t * data_set)
2086 {
2087     int counter = -1;
2088     const char *task = NULL;
2089     const char *status = NULL;
2090     GListPtr gIter = sorted_op_list;
2091 
2092     CRM_ASSERT(rsc);
2093     pe_rsc_trace(rsc, "%s: Start index %d, stop index = %d", rsc->id, start_index, stop_index);
2094 
2095     for (; gIter != NULL; gIter = gIter->next) {
2096         xmlNode *rsc_op = (xmlNode *) gIter->data;
2097 
2098         int interval = 0;
2099         char *key = NULL;
2100         const char *id = ID(rsc_op);
2101         const char *interval_s = NULL;
2102 
2103         counter++;
2104 
2105         if (node->details->online == FALSE) {
2106             pe_rsc_trace(rsc, "Skipping %s/%s: node is offline", rsc->id, node->details->uname);
2107             break;
2108 
2109             /* Need to check if there's a monitor for role="Stopped" */
2110         } else if (start_index < stop_index && counter <= stop_index) {
2111             pe_rsc_trace(rsc, "Skipping %s/%s: resource is not active", id, node->details->uname);
2112             continue;
2113 
2114         } else if (counter < start_index) {
2115             pe_rsc_trace(rsc, "Skipping %s/%s: old %d", id, node->details->uname, counter);
2116             continue;
2117         }
2118 
2119         interval_s = crm_element_value(rsc_op, XML_LRM_ATTR_INTERVAL);
2120         interval = crm_parse_int(interval_s, "0");
2121         if (interval == 0) {
2122             pe_rsc_trace(rsc, "Skipping %s/%s: non-recurring", id, node->details->uname);
2123             continue;
2124         }
2125 
2126         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2127         if (safe_str_eq(status, "-1")) {
2128             pe_rsc_trace(rsc, "Skipping %s/%s: status", id, node->details->uname);
2129             continue;
2130         }
2131         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2132         /* create the action */
2133         key = generate_op_key(rsc->id, task, interval);
2134         pe_rsc_trace(rsc, "Creating %s/%s", key, node->details->uname);
2135         custom_action(rsc, key, task, node, TRUE, TRUE, data_set);
2136     }
2137 }
2138 
2139 void
2140 calculate_active_ops(GListPtr sorted_op_list, int *start_index, int *stop_index)
     /* [previous][next][first][last][top][bottom][index][help] */
2141 {
2142     int counter = -1;
2143     int implied_monitor_start = -1;
2144     int implied_master_start = -1;
2145     const char *task = NULL;
2146     const char *status = NULL;
2147     GListPtr gIter = sorted_op_list;
2148 
2149     *stop_index = -1;
2150     *start_index = -1;
2151 
2152     for (; gIter != NULL; gIter = gIter->next) {
2153         xmlNode *rsc_op = (xmlNode *) gIter->data;
2154 
2155         counter++;
2156 
2157         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2158         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
2159 
2160         if (safe_str_eq(task, CRMD_ACTION_STOP)
2161             && safe_str_eq(status, "0")) {
2162             *stop_index = counter;
2163 
2164         } else if (safe_str_eq(task, CRMD_ACTION_START) || safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
2165             *start_index = counter;
2166 
2167         } else if ((implied_monitor_start <= *stop_index) && safe_str_eq(task, CRMD_ACTION_STATUS)) {
2168             const char *rc = crm_element_value(rsc_op, XML_LRM_ATTR_RC);
2169 
2170             if (safe_str_eq(rc, "0") || safe_str_eq(rc, "8")) {
2171                 implied_monitor_start = counter;
2172             }
2173         } else if (safe_str_eq(task, CRMD_ACTION_PROMOTE) || safe_str_eq(task, CRMD_ACTION_DEMOTE)) {
2174             implied_master_start = counter;
2175         }
2176     }
2177 
2178     if (*start_index == -1) {
2179         if (implied_master_start != -1) {
2180             *start_index = implied_master_start;
2181         } else if (implied_monitor_start != -1) {
2182             *start_index = implied_monitor_start;
2183         }
2184     }
2185 }
2186 
2187 static resource_t *
2188 unpack_lrm_rsc_state(node_t * node, xmlNode * rsc_entry, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2189 {
2190     GListPtr gIter = NULL;
2191     int stop_index = -1;
2192     int start_index = -1;
2193     enum rsc_role_e req_role = RSC_ROLE_UNKNOWN;
2194 
2195     const char *task = NULL;
2196     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
2197 
2198     resource_t *rsc = NULL;
2199     GListPtr op_list = NULL;
2200     GListPtr sorted_op_list = NULL;
2201 
2202     xmlNode *migrate_op = NULL;
2203     xmlNode *rsc_op = NULL;
2204     xmlNode *last_failure = NULL;
2205 
2206     enum action_fail_response on_fail = FALSE;
2207     enum rsc_role_e saved_role = RSC_ROLE_UNKNOWN;
2208 
2209     crm_trace("[%s] Processing %s on %s",
2210               crm_element_name(rsc_entry), rsc_id, node->details->uname);
2211 
2212     /* extract operations */
2213     op_list = NULL;
2214     sorted_op_list = NULL;
2215 
2216     for (rsc_op = __xml_first_child(rsc_entry); rsc_op != NULL; rsc_op = __xml_next_element(rsc_op)) {
2217         if (crm_str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP, TRUE)) {
2218             op_list = g_list_prepend(op_list, rsc_op);
2219         }
2220     }
2221 
2222     if (op_list == NULL) {
2223         /* if there are no operations, there is nothing to do */
2224         return NULL;
2225     }
2226 
2227     /* find the resource */
2228     rsc = unpack_find_resource(data_set, node, rsc_id, rsc_entry);
2229     if (rsc == NULL) {
2230         rsc = process_orphan_resource(rsc_entry, node, data_set);
2231     }
2232     CRM_ASSERT(rsc != NULL);
2233 
2234     /* process operations */
2235     saved_role = rsc->role;
2236     on_fail = action_fail_ignore;
2237     rsc->role = RSC_ROLE_UNKNOWN;
2238     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
2239 
2240     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
2241         xmlNode *rsc_op = (xmlNode *) gIter->data;
2242 
2243         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
2244         if (safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
2245             migrate_op = rsc_op;
2246         }
2247 
2248         unpack_rsc_op(rsc, node, rsc_op, &last_failure, &on_fail, data_set);
2249     }
2250 
2251     /* create active recurring operations as optional */
2252     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
2253     process_recurring(node, rsc, start_index, stop_index, sorted_op_list, data_set);
2254 
2255     /* no need to free the contents */
2256     g_list_free(sorted_op_list);
2257 
2258     process_rsc_state(rsc, node, on_fail, migrate_op, data_set);
2259 
2260     if (get_target_role(rsc, &req_role)) {
2261         if (rsc->next_role == RSC_ROLE_UNKNOWN || req_role < rsc->next_role) {
2262             pe_rsc_debug(rsc, "%s: Overwriting calculated next role %s"
2263                          " with requested next role %s",
2264                          rsc->id, role2text(rsc->next_role), role2text(req_role));
2265             rsc->next_role = req_role;
2266 
2267         } else if (req_role > rsc->next_role) {
2268             pe_rsc_info(rsc, "%s: Not overwriting calculated next role %s"
2269                         " with requested next role %s",
2270                         rsc->id, role2text(rsc->next_role), role2text(req_role));
2271         }
2272     }
2273 
2274     if (saved_role > rsc->role) {
2275         rsc->role = saved_role;
2276     }
2277 
2278     return rsc;
2279 }
2280 
2281 static void
2282 handle_orphaned_container_fillers(xmlNode * lrm_rsc_list, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2283 {
2284     xmlNode *rsc_entry = NULL;
2285     for (rsc_entry = __xml_first_child(lrm_rsc_list); rsc_entry != NULL;
2286         rsc_entry = __xml_next_element(rsc_entry)) {
2287 
2288         resource_t *rsc;
2289         resource_t *container;
2290         const char *rsc_id;
2291         const char *container_id;
2292 
2293         if (safe_str_neq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE)) {
2294             continue;
2295         }
2296 
2297         container_id = crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER);
2298         rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
2299         if (container_id == NULL || rsc_id == NULL) {
2300             continue;
2301         }
2302 
2303         container = pe_find_resource(data_set->resources, container_id);
2304         if (container == NULL) {
2305             continue;
2306         }
2307 
2308         rsc = pe_find_resource(data_set->resources, rsc_id);
2309         if (rsc == NULL ||
2310             is_set(rsc->flags, pe_rsc_orphan_container_filler) == FALSE ||
2311             rsc->container != NULL) {
2312             continue;
2313         }
2314 
2315         pe_rsc_trace(rsc, "Mapped orphaned rsc %s's container to  %s", rsc->id, container_id);
2316         rsc->container = container;
2317         container->fillers = g_list_append(container->fillers, rsc);
2318     }
2319 }
2320 
2321 gboolean
2322 unpack_lrm_resources(node_t * node, xmlNode * lrm_rsc_list, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2323 {
2324     xmlNode *rsc_entry = NULL;
2325     gboolean found_orphaned_container_filler = FALSE;
2326 
2327     CRM_CHECK(node != NULL, return FALSE);
2328 
2329     crm_trace("Unpacking resources on %s", node->details->uname);
2330 
2331     for (rsc_entry = __xml_first_child(lrm_rsc_list); rsc_entry != NULL;
2332          rsc_entry = __xml_next_element(rsc_entry)) {
2333 
2334         if (crm_str_eq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE, TRUE)) {
2335             resource_t *rsc = unpack_lrm_rsc_state(node, rsc_entry, data_set);
2336             if (!rsc) {
2337                 continue;
2338             }
2339             if (is_set(rsc->flags, pe_rsc_orphan_container_filler)) {
2340                 found_orphaned_container_filler = TRUE;
2341             }
2342         }
2343     }
2344 
2345     /* now that all the resource state has been unpacked for this node
2346      * we have to go back and map any orphaned container fillers to their
2347      * container resource */
2348     if (found_orphaned_container_filler) {
2349         handle_orphaned_container_fillers(lrm_rsc_list, data_set);
2350     }
2351     return TRUE;
2352 }
2353 
2354 static void
2355 set_active(resource_t * rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
2356 {
2357     resource_t *top = uber_parent(rsc);
2358 
2359     if (top && top->variant == pe_master) {
2360         rsc->role = RSC_ROLE_SLAVE;
2361     } else {
2362         rsc->role = RSC_ROLE_STARTED;
2363     }
2364 }
2365 
2366 static void
2367 set_node_score(gpointer key, gpointer value, gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
2368 {
2369     node_t *node = value;
2370     int *score = user_data;
2371 
2372     node->weight = *score;
2373 }
2374 
2375 #define STATUS_PATH_MAX 1024
2376 static xmlNode *
2377 find_lrm_op(const char *resource, const char *op, const char *node, const char *source,
     /* [previous][next][first][last][top][bottom][index][help] */
2378             pe_working_set_t * data_set)
2379 {
2380     int offset = 0;
2381     char xpath[STATUS_PATH_MAX];
2382 
2383     offset += snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//node_state[@uname='%s']", node);
2384     offset +=
2385         snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//" XML_LRM_TAG_RESOURCE "[@id='%s']",
2386                  resource);
2387 
2388     /* Need to check against transition_magic too? */
2389     if (source && safe_str_eq(op, CRMD_ACTION_MIGRATE)) {
2390         offset +=
2391             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2392                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_target='%s']", op,
2393                      source);
2394     } else if (source && safe_str_eq(op, CRMD_ACTION_MIGRATED)) {
2395         offset +=
2396             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2397                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_source='%s']", op,
2398                      source);
2399     } else {
2400         offset +=
2401             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
2402                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s']", op);
2403     }
2404 
2405     CRM_LOG_ASSERT(offset > 0);
2406     return get_xpath_object(xpath, data_set->input, LOG_DEBUG);
2407 }
2408 
2409 static void
2410 unpack_rsc_migration(resource_t *rsc, node_t *node, xmlNode *xml_op, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2411 {
2412 
2413     /*
2414      * The normal sequence is (now): migrate_to(Src) -> migrate_from(Tgt) -> stop(Src)
2415      *
2416      * So if a migrate_to is followed by a stop, then we don't need to care what
2417      * happened on the target node
2418      *
2419      * Without the stop, we need to look for a successful migrate_from.
2420      * This would also imply we're no longer running on the source
2421      *
2422      * Without the stop, and without a migrate_from op we make sure the resource
2423      * gets stopped on both source and target (assuming the target is up)
2424      *
2425      */
2426     int stop_id = 0;
2427     int task_id = 0;
2428     xmlNode *stop_op =
2429         find_lrm_op(rsc->id, CRMD_ACTION_STOP, node->details->id, NULL, data_set);
2430 
2431     if (stop_op) {
2432         crm_element_value_int(stop_op, XML_LRM_ATTR_CALLID, &stop_id);
2433     }
2434 
2435     crm_element_value_int(xml_op, XML_LRM_ATTR_CALLID, &task_id);
2436 
2437     if (stop_op == NULL || stop_id < task_id) {
2438         int from_rc = 0, from_status = 0;
2439         const char *migrate_source =
2440             crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2441         const char *migrate_target =
2442             crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2443 
2444         node_t *target = pe_find_node(data_set->nodes, migrate_target);
2445         node_t *source = pe_find_node(data_set->nodes, migrate_source);
2446         xmlNode *migrate_from =
2447             find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, migrate_target, migrate_source,
2448                         data_set);
2449 
2450         rsc->role = RSC_ROLE_STARTED;       /* can be master? */
2451         if (migrate_from) {
2452             crm_element_value_int(migrate_from, XML_LRM_ATTR_RC, &from_rc);
2453             crm_element_value_int(migrate_from, XML_LRM_ATTR_OPSTATUS, &from_status);
2454             pe_rsc_trace(rsc, "%s op on %s exited with status=%d, rc=%d",
2455                          ID(migrate_from), migrate_target, from_status, from_rc);
2456         }
2457 
2458         if (migrate_from && from_rc == PCMK_OCF_OK
2459             && from_status == PCMK_LRM_OP_DONE) {
2460             pe_rsc_trace(rsc, "Detected dangling migration op: %s on %s", ID(xml_op),
2461                          migrate_source);
2462 
2463             /* all good
2464              * just need to arrange for the stop action to get sent
2465              * but _without_ affecting the target somehow
2466              */
2467             rsc->role = RSC_ROLE_STOPPED;
2468             rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
2469 
2470         } else if (migrate_from) {  /* Failed */
2471             if (target && target->details->online) {
2472                 pe_rsc_trace(rsc, "Marking active on %s %p %d", migrate_target, target,
2473                              target->details->online);
2474                 native_add_running(rsc, target, data_set);
2475             }
2476 
2477         } else {    /* Pending or complete but erased */
2478             if (target && target->details->online) {
2479                 pe_rsc_trace(rsc, "Marking active on %s %p %d", migrate_target, target,
2480                              target->details->online);
2481 
2482                 native_add_running(rsc, target, data_set);
2483                 if (source && source->details->online) {
2484                     /* If we make it here we have a partial migration.  The migrate_to
2485                      * has completed but the migrate_from on the target has not. Hold on
2486                      * to the target and source on the resource. Later on if we detect that
2487                      * the resource is still going to run on that target, we may continue
2488                      * the migration */
2489                     rsc->partial_migration_target = target;
2490                     rsc->partial_migration_source = source;
2491                 }
2492             } else {
2493                 /* Consider it failed here - forces a restart, prevents migration */
2494                 set_bit(rsc->flags, pe_rsc_failed);
2495                 clear_bit(rsc->flags, pe_rsc_allow_migrate);
2496             }
2497         }
2498     }
2499 }
2500 
2501 static void
2502 unpack_rsc_migration_failure(resource_t *rsc, node_t *node, xmlNode *xml_op, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
2503 {
2504     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
2505 
2506     CRM_ASSERT(rsc);
2507     if (safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
2508         int stop_id = 0;
2509         int migrate_id = 0;
2510         const char *migrate_source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2511         const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2512 
2513         xmlNode *stop_op =
2514             find_lrm_op(rsc->id, CRMD_ACTION_STOP, migrate_source, NULL, data_set);
2515         xmlNode *migrate_op =
2516             find_lrm_op(rsc->id, CRMD_ACTION_MIGRATE, migrate_source, migrate_target,
2517                         data_set);
2518 
2519         if (stop_op) {
2520             crm_element_value_int(stop_op, XML_LRM_ATTR_CALLID, &stop_id);
2521         }
2522         if (migrate_op) {
2523             crm_element_value_int(migrate_op, XML_LRM_ATTR_CALLID, &migrate_id);
2524         }
2525 
2526         /* Get our state right */
2527         rsc->role = RSC_ROLE_STARTED;   /* can be master? */
2528 
2529         if (stop_op == NULL || stop_id < migrate_id) {
2530             node_t *source = pe_find_node(data_set->nodes, migrate_source);
2531 
2532             if (source && source->details->online) {
2533                 native_add_running(rsc, source, data_set);
2534             }
2535         }
2536 
2537     } else if (safe_str_eq(task, CRMD_ACTION_MIGRATE)) {
2538         int stop_id = 0;
2539         int migrate_id = 0;
2540         const char *migrate_source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
2541         const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
2542 
2543         xmlNode *stop_op =
2544             find_lrm_op(rsc->id, CRMD_ACTION_STOP, migrate_target, NULL, data_set);
2545         xmlNode *migrate_op =
2546             find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, migrate_target, migrate_source,
2547                         data_set);
2548 
2549         if (stop_op) {
2550             crm_element_value_int(stop_op, XML_LRM_ATTR_CALLID, &stop_id);
2551         }
2552         if (migrate_op) {
2553             crm_element_value_int(migrate_op, XML_LRM_ATTR_CALLID, &migrate_id);
2554         }
2555 
2556         /* Get our state right */
2557         rsc->role = RSC_ROLE_STARTED;   /* can be master? */
2558 
2559         if (stop_op == NULL || stop_id < migrate_id) {
2560             node_t *target = pe_find_node(data_set->nodes, migrate_target);
2561 
2562             pe_rsc_trace(rsc, "Stop: %p %d, Migrated: %p %d", stop_op, stop_id, migrate_op,
2563                          migrate_id);
2564             if (target && target->details->online) {
2565                 native_add_running(rsc, target, data_set);
2566             }
2567 
2568         } else if (migrate_op == NULL) {
2569             /* Make sure it gets cleaned up, the stop may pre-date the migrate_from */
2570             rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
2571         }
2572     }
2573 }
2574 
2575 static void
2576 record_failed_op(xmlNode *op, node_t* node, resource_t *rsc, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2577 {
2578     xmlNode *xIter = NULL;
2579     const char *op_key = crm_element_value(op, XML_LRM_ATTR_TASK_KEY);
2580 
2581     if (node->details->online == FALSE) {
2582         return;
2583     }
2584 
2585     for (xIter = data_set->failed->children; xIter; xIter = xIter->next) {
2586         const char *key = crm_element_value(xIter, XML_LRM_ATTR_TASK_KEY);
2587         const char *uname = crm_element_value(xIter, XML_ATTR_UNAME);
2588 
2589         if(safe_str_eq(op_key, key) && safe_str_eq(uname, node->details->uname)) {
2590             crm_trace("Skipping duplicate entry %s on %s", op_key, node->details->uname);
2591             return;
2592         }
2593     }
2594 
2595     crm_trace("Adding entry %s on %s", op_key, node->details->uname);
2596     crm_xml_add(op, XML_ATTR_UNAME, node->details->uname);
2597     crm_xml_add(op, XML_LRM_ATTR_RSCID, rsc->id);
2598     add_node_copy(data_set->failed, op);
2599 }
2600 
2601 static const char *get_op_key(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
2602 {
2603     const char *key = crm_element_value(xml_op, XML_LRM_ATTR_TASK_KEY);
2604     if(key == NULL) {
2605         key = ID(xml_op);
2606     }
2607     return key;
2608 }
2609 
2610 static void
2611 unpack_rsc_op_failure(resource_t * rsc, node_t * node, int rc, xmlNode * xml_op, xmlNode ** last_failure,
     /* [previous][next][first][last][top][bottom][index][help] */
2612                       enum action_fail_response * on_fail, pe_working_set_t * data_set)
2613 {
2614     int interval = 0;
2615     bool is_probe = FALSE;
2616     action_t *action = NULL;
2617 
2618     const char *key = get_op_key(xml_op);
2619     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
2620     const char *op_version = crm_element_value(xml_op, XML_ATTR_CRM_VERSION);
2621 
2622     CRM_ASSERT(rsc);
2623 
2624     *last_failure = xml_op;
2625 
2626     crm_element_value_int(xml_op, XML_LRM_ATTR_INTERVAL, &interval);
2627     if(interval == 0 && safe_str_eq(task, CRMD_ACTION_STATUS)) {
2628         is_probe = TRUE;
2629         pe_rsc_trace(rsc, "is a probe: %s", key);
2630     }
2631 
2632     if (rc != PCMK_OCF_NOT_INSTALLED || is_set(data_set->flags, pe_flag_symmetric_cluster)) {
2633         crm_warn("Processing failed op %s for %s on %s: %s (%d)",
2634                  task, rsc->id, node->details->uname, services_ocf_exitcode_str(rc),
2635                  rc);
2636 
2637         record_failed_op(xml_op, node, rsc, data_set);
2638 
2639     } else {
2640         crm_trace("Processing failed op %s for %s on %s: %s (%d)",
2641                  task, rsc->id, node->details->uname, services_ocf_exitcode_str(rc),
2642                  rc);
2643     }
2644 
2645     action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
2646     if ((action->on_fail <= action_fail_fence && *on_fail < action->on_fail) ||
2647         (action->on_fail == action_fail_reset_remote && *on_fail <= action_fail_recover) ||
2648         (action->on_fail == action_fail_restart_container && *on_fail <= action_fail_recover) ||
2649         (*on_fail == action_fail_restart_container && action->on_fail >= action_fail_migrate)) {
2650         pe_rsc_trace(rsc, "on-fail %s -> %s for %s (%s)", fail2text(*on_fail),
2651                      fail2text(action->on_fail), action->uuid, key);
2652         *on_fail = action->on_fail;
2653     }
2654 
2655     if (safe_str_eq(task, CRMD_ACTION_STOP)) {
2656         resource_location(rsc, node, -INFINITY, "__stop_fail__", data_set);
2657 
2658     } else if (safe_str_eq(task, CRMD_ACTION_MIGRATE) || safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
2659         unpack_rsc_migration_failure(rsc, node, xml_op, data_set);
2660 
2661     } else if (safe_str_eq(task, CRMD_ACTION_PROMOTE)) {
2662         rsc->role = RSC_ROLE_MASTER;
2663 
2664     } else if (safe_str_eq(task, CRMD_ACTION_DEMOTE)) {
2665         /*
2666          * staying in role=master ends up putting the PE/TE into a loop
2667          * setting role=slave is not dangerous because no master will be
2668          * promoted until the failed resource has been fully stopped
2669          */
2670         if (action->on_fail == action_fail_block) {
2671             rsc->role = RSC_ROLE_MASTER;
2672             rsc->next_role = RSC_ROLE_STOPPED;
2673 
2674         } else if(rc == PCMK_OCF_NOT_RUNNING) {
2675             rsc->role = RSC_ROLE_STOPPED;
2676 
2677         } else {
2678             crm_warn("Forcing %s to stop after a failed demote action", rsc->id);
2679             rsc->role = RSC_ROLE_SLAVE;
2680             rsc->next_role = RSC_ROLE_STOPPED;
2681         }
2682 
2683     } else if (compare_version("2.0", op_version) > 0 && safe_str_eq(task, CRMD_ACTION_START)) {
2684         crm_warn("Compatibility handling for failed op %s on %s", key, node->details->uname);
2685         resource_location(rsc, node, -INFINITY, "__legacy_start__", data_set);
2686     }
2687 
2688     if(is_probe && rc == PCMK_OCF_NOT_INSTALLED) {
2689         /* leave stopped */
2690         pe_rsc_trace(rsc, "Leaving %s stopped", rsc->id);
2691         rsc->role = RSC_ROLE_STOPPED;
2692 
2693     } else if (rsc->role < RSC_ROLE_STARTED) {
2694         pe_rsc_trace(rsc, "Setting %s active", rsc->id);
2695         set_active(rsc);
2696     }
2697 
2698     pe_rsc_trace(rsc, "Resource %s: role=%s, unclean=%s, on_fail=%s, fail_role=%s",
2699                  rsc->id, role2text(rsc->role),
2700                  node->details->unclean ? "true" : "false",
2701                  fail2text(action->on_fail), role2text(action->fail_role));
2702 
2703     if (action->fail_role != RSC_ROLE_STARTED && rsc->next_role < action->fail_role) {
2704         rsc->next_role = action->fail_role;
2705     }
2706 
2707     if (action->fail_role == RSC_ROLE_STOPPED) {
2708         int score = -INFINITY;
2709 
2710         resource_t *fail_rsc = rsc;
2711 
2712         if (fail_rsc->parent) {
2713             resource_t *parent = uber_parent(fail_rsc);
2714 
2715             if (pe_rsc_is_clone(parent)
2716                 && is_not_set(parent->flags, pe_rsc_unique)) {
2717                 /* for clone and master resources, if a child fails on an operation
2718                  * with on-fail = stop, all the resources fail.  Do this by preventing
2719                  * the parent from coming up again. */
2720                 fail_rsc = parent;
2721             }
2722         }
2723         crm_warn("Making sure %s doesn't come up again", fail_rsc->id);
2724         /* make sure it doesn't come up again */
2725         g_hash_table_destroy(fail_rsc->allowed_nodes);
2726         fail_rsc->allowed_nodes = node_hash_from_list(data_set->nodes);
2727         g_hash_table_foreach(fail_rsc->allowed_nodes, set_node_score, &score);
2728     }
2729 
2730     pe_free_action(action);
2731 }
2732 
2733 static int
2734 determine_op_status(
     /* [previous][next][first][last][top][bottom][index][help] */
2735     resource_t *rsc, int rc, int target_rc, node_t * node, xmlNode * xml_op, enum action_fail_response * on_fail, pe_working_set_t * data_set) 
2736 {
2737     int interval = 0;
2738     int result = PCMK_LRM_OP_DONE;
2739 
2740     const char *key = get_op_key(xml_op);
2741     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
2742 
2743     bool is_probe = FALSE;
2744 
2745     CRM_ASSERT(rsc);
2746     crm_element_value_int(xml_op, XML_LRM_ATTR_INTERVAL, &interval);
2747     if (interval == 0 && safe_str_eq(task, CRMD_ACTION_STATUS)) {
2748         is_probe = TRUE;
2749     }
2750 
2751     if (target_rc >= 0 && target_rc != rc) {
2752         result = PCMK_LRM_OP_ERROR;
2753         pe_rsc_debug(rsc, "%s on %s returned '%s' (%d) instead of the expected value: '%s' (%d)",
2754                      key, node->details->uname,
2755                      services_ocf_exitcode_str(rc), rc,
2756                      services_ocf_exitcode_str(target_rc), target_rc);
2757     }
2758 
2759     /* we could clean this up significantly except for old LRMs and CRMs that
2760      * didn't include target_rc and liked to remap status
2761      */
2762     switch (rc) {
2763         case PCMK_OCF_OK:
2764             if (is_probe && target_rc == 7) {
2765                 result = PCMK_LRM_OP_DONE;
2766                 pe_rsc_info(rsc, "Operation %s found resource %s active on %s",
2767                             task, rsc->id, node->details->uname);
2768 
2769                 /* legacy code for pre-0.6.5 operations */
2770             } else if (target_rc < 0 && interval > 0 && rsc->role == RSC_ROLE_MASTER) {
2771                 /* catch status ops that return 0 instead of 8 while they
2772                  *   are supposed to be in master mode
2773                  */
2774                 result = PCMK_LRM_OP_ERROR;
2775             }
2776             break;
2777 
2778         case PCMK_OCF_NOT_RUNNING:
2779             if (is_probe || target_rc == rc || is_not_set(rsc->flags, pe_rsc_managed)) {
2780                 result = PCMK_LRM_OP_DONE;
2781                 rsc->role = RSC_ROLE_STOPPED;
2782 
2783                 /* clear any previous failure actions */
2784                 *on_fail = action_fail_ignore;
2785                 rsc->next_role = RSC_ROLE_UNKNOWN;
2786 
2787             } else if (safe_str_neq(task, CRMD_ACTION_STOP)) {
2788                 result = PCMK_LRM_OP_ERROR;
2789             }
2790             break;
2791 
2792         case PCMK_OCF_RUNNING_MASTER:
2793             if (is_probe) {
2794                 result = PCMK_LRM_OP_DONE;
2795                 pe_rsc_info(rsc, "Operation %s found resource %s active in master mode on %s",
2796                             task, rsc->id, node->details->uname);
2797 
2798             } else if (target_rc == rc) {
2799                 /* nothing to do */
2800 
2801             } else if (target_rc >= 0) {
2802                 result = PCMK_LRM_OP_ERROR;
2803 
2804                 /* legacy code for pre-0.6.5 operations */
2805             } else if (safe_str_neq(task, CRMD_ACTION_STATUS)
2806                        || rsc->role != RSC_ROLE_MASTER) {
2807                 result = PCMK_LRM_OP_ERROR;
2808                 if (rsc->role != RSC_ROLE_MASTER) {
2809                     crm_err("%s reported %s in master mode on %s",
2810                             key, rsc->id, node->details->uname);
2811                 }
2812             }
2813             rsc->role = RSC_ROLE_MASTER;
2814             break;
2815 
2816         case PCMK_OCF_DEGRADED_MASTER:
2817         case PCMK_OCF_FAILED_MASTER:
2818             rsc->role = RSC_ROLE_MASTER;
2819             result = PCMK_LRM_OP_ERROR;
2820             break;
2821 
2822         case PCMK_OCF_NOT_CONFIGURED:
2823             result = PCMK_LRM_OP_ERROR_FATAL;
2824             break;
2825 
2826         case PCMK_OCF_NOT_INSTALLED:
2827         case PCMK_OCF_INVALID_PARAM:
2828         case PCMK_OCF_INSUFFICIENT_PRIV:
2829         case PCMK_OCF_UNIMPLEMENT_FEATURE:
2830             if (rc == PCMK_OCF_UNIMPLEMENT_FEATURE && interval > 0) {
2831                 result = PCMK_LRM_OP_NOTSUPPORTED;
2832                 break;
2833 
2834             } else if (pe_can_fence(data_set, node) == FALSE
2835                && safe_str_eq(task, CRMD_ACTION_STOP)) {
2836                 /* If a stop fails and we can't fence, there's nothing else we can do */
2837                 pe_proc_err("No further recovery can be attempted for %s: %s action failed with '%s' (%d)",
2838                             rsc->id, task, services_ocf_exitcode_str(rc), rc);
2839                 clear_bit(rsc->flags, pe_rsc_managed);
2840                 set_bit(rsc->flags, pe_rsc_block);
2841             }
2842             result = PCMK_LRM_OP_ERROR_HARD;
2843             break;
2844 
2845         default:
2846             if (result == PCMK_LRM_OP_DONE) {
2847                 crm_info("Treating %s (rc=%d) on %s as an ERROR",
2848                          key, rc, node->details->uname);
2849                 result = PCMK_LRM_OP_ERROR;
2850             }
2851     }
2852 
2853     return result;
2854 }
2855 
2856 static bool check_operation_expiry(resource_t *rsc, node_t *node, int rc, xmlNode *xml_op, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
2857 {
2858     bool expired = FALSE;
2859     time_t last_failure = 0;
2860     int interval = 0;
2861     int failure_timeout = rsc->failure_timeout;
2862     const char *key = get_op_key(xml_op);
2863     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
2864     const char *clear_reason = NULL;
2865 
2866     /* clearing recurring monitor operation failures automatically
2867      * needs to be carefully considered */
2868     if (safe_str_eq(crm_element_value(xml_op, XML_LRM_ATTR_TASK), "monitor") &&
2869         safe_str_neq(crm_element_value(xml_op, XML_LRM_ATTR_INTERVAL), "0")) {
2870 
2871         /* TODO, in the future we should consider not clearing recurring monitor
2872          * op failures unless the last action for a resource was a "stop" action.
2873          * otherwise it is possible that clearing the monitor failure will result
2874          * in the resource being in an undeterministic state.
2875          *
2876          * For now we handle this potential undeterministic condition for remote
2877          * node connection resources by not clearing a recurring monitor op failure
2878          * until after the node has been fenced. */
2879 
2880         if (is_set(data_set->flags, pe_flag_stonith_enabled) &&
2881             (rsc->remote_reconnect_interval)) {
2882 
2883             node_t *remote_node = pe_find_node(data_set->nodes, rsc->id);
2884             if (remote_node && remote_node->details->remote_was_fenced == 0) {
2885                 if (strstr(ID(xml_op), "last_failure")) {
2886                     crm_info("Waiting to clear monitor failure for remote node %s until fencing has occurred", rsc->id); 
2887                 }
2888                 /* disabling failure timeout for this operation because we believe
2889                  * fencing of the remote node should occur first. */ 
2890                 failure_timeout = 0;
2891             }
2892         }
2893     }
2894 
2895     if (failure_timeout > 0) {
2896         int last_run = 0;
2897 
2898         if (crm_element_value_int(xml_op, XML_RSC_OP_LAST_CHANGE, &last_run) == 0) {
2899             time_t now = get_effective_time(data_set);
2900 
2901             if (now > (last_run + failure_timeout)) {
2902                 expired = TRUE;
2903             }
2904         }
2905     }
2906 
2907     if (expired) {
2908         if (failure_timeout > 0) {
2909             if (pe_get_failcount(node, rsc, &last_failure, pe_fc_default,
2910                                  xml_op, data_set)) {
2911 
2912                 if (pe_get_failcount(node, rsc, &last_failure, pe_fc_effective,
2913                                      xml_op, data_set) == 0) {
2914                     clear_reason = "it expired";
2915                 } else {
2916                     expired = FALSE;
2917                 }
2918 
2919             } else if (rsc->remote_reconnect_interval && strstr(ID(xml_op), "last_failure")) {
2920                 /* always clear last failure when reconnect interval is set */
2921                 clear_reason = "reconnect interval is set";
2922             }
2923         }
2924 
2925     } else if (strstr(ID(xml_op), "last_failure") &&
2926                ((strcmp(task, "start") == 0) || (strcmp(task, "monitor") == 0))) {
2927 
2928         op_digest_cache_t *digest_data = NULL;
2929 
2930         digest_data = rsc_action_digest_cmp(rsc, xml_op, node, data_set);
2931 
2932         if (digest_data->rc == RSC_DIGEST_UNKNOWN) {
2933             crm_trace("rsc op %s/%s on node %s does not have a op digest to compare against", rsc->id,
2934                       key, node->details->id);
2935         } else if(container_fix_remote_addr(rsc) && digest_data->rc != RSC_DIGEST_MATCH) {
2936             // We can't sanely check the changing 'addr' attribute. Yet
2937             crm_trace("Ignoring rsc op %s/%s on node %s", rsc->id, key, node->details->id);
2938 
2939         } else if (digest_data->rc != RSC_DIGEST_MATCH) {
2940             clear_reason = "resource parameters have changed";
2941         }
2942     }
2943 
2944     if (clear_reason != NULL) {
2945         char *key = generate_op_key(rsc->id, CRM_OP_CLEAR_FAILCOUNT, 0);
2946         action_t *clear_op = custom_action(rsc, key, CRM_OP_CLEAR_FAILCOUNT,
2947                                            node, FALSE, TRUE, data_set);
2948 
2949         add_hash_param(clear_op->meta, XML_ATTR_TE_NOWAIT, XML_BOOLEAN_TRUE);
2950 
2951         crm_notice("Clearing failure of %s on %s because %s " CRM_XS " %s",
2952                    rsc->id, node->details->uname, clear_reason, clear_op->uuid);
2953     }
2954 
2955     crm_element_value_int(xml_op, XML_LRM_ATTR_INTERVAL, &interval);
2956     if(expired && interval == 0 && safe_str_eq(task, CRMD_ACTION_STATUS)) {
2957         switch(rc) {
2958             case PCMK_OCF_OK:
2959             case PCMK_OCF_NOT_RUNNING:
2960             case PCMK_OCF_RUNNING_MASTER:
2961             case PCMK_OCF_DEGRADED:
2962             case PCMK_OCF_DEGRADED_MASTER:
2963                 /* Don't expire probes that return these values */ 
2964                 expired = FALSE;
2965                 break;
2966         }
2967     }
2968     
2969     return expired;
2970 }
2971 
2972 int get_target_rc(xmlNode *xml_op)
     /* [previous][next][first][last][top][bottom][index][help] */
2973 {
2974     int dummy = 0;
2975     int target_rc = 0;
2976     char *dummy_string = NULL;
2977     const char *key = crm_element_value(xml_op, XML_ATTR_TRANSITION_KEY);
2978     if (key == NULL) {
2979         return -1;
2980     }
2981 
2982     decode_transition_key(key, &dummy_string, &dummy, &dummy, &target_rc);
2983     free(dummy_string);
2984 
2985     return target_rc;
2986 }
2987 
2988 static enum action_fail_response
2989 get_action_on_fail(resource_t *rsc, const char *key, const char *task, pe_working_set_t * data_set) 
     /* [previous][next][first][last][top][bottom][index][help] */
2990 {
2991     int result = action_fail_recover;
2992     action_t *action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
2993 
2994     result = action->on_fail;
2995     pe_free_action(action);
2996 
2997     return result;
2998 }
2999 
3000 static void
3001 update_resource_state(resource_t * rsc, node_t * node, xmlNode * xml_op, const char * task, int rc,
     /* [previous][next][first][last][top][bottom][index][help] */
3002                       xmlNode * last_failure, enum action_fail_response * on_fail, pe_working_set_t * data_set)
3003 {
3004     gboolean clear_past_failure = FALSE;
3005 
3006     CRM_ASSERT(rsc);
3007     CRM_ASSERT(xml_op);
3008 
3009     if (rc == PCMK_OCF_NOT_RUNNING) {
3010         clear_past_failure = TRUE;
3011 
3012     } else if (rc == PCMK_OCF_NOT_INSTALLED) {
3013         rsc->role = RSC_ROLE_STOPPED;
3014 
3015     } else if (safe_str_eq(task, CRMD_ACTION_STATUS)) {
3016         if (last_failure) {
3017             const char *op_key = get_op_key(xml_op);
3018             const char *last_failure_key = get_op_key(last_failure);
3019 
3020             if (safe_str_eq(op_key, last_failure_key)) {
3021                 clear_past_failure = TRUE;
3022             }
3023         }
3024 
3025         if (rsc->role < RSC_ROLE_STARTED) {
3026             set_active(rsc);
3027         }
3028 
3029     } else if (safe_str_eq(task, CRMD_ACTION_START)) {
3030         rsc->role = RSC_ROLE_STARTED;
3031         clear_past_failure = TRUE;
3032 
3033     } else if (safe_str_eq(task, CRMD_ACTION_STOP)) {
3034         rsc->role = RSC_ROLE_STOPPED;
3035         clear_past_failure = TRUE;
3036 
3037     } else if (safe_str_eq(task, CRMD_ACTION_PROMOTE)) {
3038         rsc->role = RSC_ROLE_MASTER;
3039         clear_past_failure = TRUE;
3040 
3041     } else if (safe_str_eq(task, CRMD_ACTION_DEMOTE)) {
3042         /* Demote from Master does not clear an error */
3043         rsc->role = RSC_ROLE_SLAVE;
3044 
3045     } else if (safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
3046         rsc->role = RSC_ROLE_STARTED;
3047         clear_past_failure = TRUE;
3048 
3049     } else if (safe_str_eq(task, CRMD_ACTION_MIGRATE)) {
3050         unpack_rsc_migration(rsc, node, xml_op, data_set);
3051 
3052     } else if (rsc->role < RSC_ROLE_STARTED) {
3053         pe_rsc_trace(rsc, "%s active on %s", rsc->id, node->details->uname);
3054         set_active(rsc);
3055     }
3056 
3057     /* clear any previous failure actions */
3058     if (clear_past_failure) {
3059         switch (*on_fail) {
3060             case action_fail_stop:
3061             case action_fail_fence:
3062             case action_fail_migrate:
3063             case action_fail_standby:
3064                 pe_rsc_trace(rsc, "%s.%s is not cleared by a completed stop",
3065                              rsc->id, fail2text(*on_fail));
3066                 break;
3067 
3068             case action_fail_block:
3069             case action_fail_ignore:
3070             case action_fail_recover:
3071             case action_fail_restart_container:
3072                 *on_fail = action_fail_ignore;
3073                 rsc->next_role = RSC_ROLE_UNKNOWN;
3074                 break;
3075             case action_fail_reset_remote:
3076                 if (rsc->remote_reconnect_interval == 0) {
3077                     /* when reconnect delay is not in use, the connection is allowed
3078                      * to start again after the remote node is fenced and completely
3079                      * stopped. Otherwise, with reconnect delay we wait for the failure
3080                      * to be cleared entirely before reconnected can be attempted. */ 
3081                     *on_fail = action_fail_ignore;
3082                     rsc->next_role = RSC_ROLE_UNKNOWN;
3083                 }
3084                 break;
3085         }
3086     }
3087 }
3088 
3089 
3090 gboolean
3091 unpack_rsc_op(resource_t * rsc, node_t * node, xmlNode * xml_op, xmlNode ** last_failure,
     /* [previous][next][first][last][top][bottom][index][help] */
3092               enum action_fail_response * on_fail, pe_working_set_t * data_set)
3093 {
3094     int task_id = 0;
3095 
3096     const char *key = NULL;
3097     const char *task = NULL;
3098     const char *task_key = NULL;
3099 
3100     int rc = 0;
3101     int status = PCMK_LRM_OP_PENDING-1;
3102     int target_rc = get_target_rc(xml_op);
3103     int interval = 0;
3104 
3105     gboolean expired = FALSE;
3106     resource_t *parent = rsc;
3107     enum action_fail_response failure_strategy = action_fail_recover;
3108 
3109     CRM_CHECK(rsc != NULL, return FALSE);
3110     CRM_CHECK(node != NULL, return FALSE);
3111     CRM_CHECK(xml_op != NULL, return FALSE);
3112 
3113     task_key = get_op_key(xml_op);
3114 
3115     task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
3116     key = crm_element_value(xml_op, XML_ATTR_TRANSITION_KEY);
3117 
3118     crm_element_value_int(xml_op, XML_LRM_ATTR_RC, &rc);
3119     crm_element_value_int(xml_op, XML_LRM_ATTR_CALLID, &task_id);
3120     crm_element_value_int(xml_op, XML_LRM_ATTR_OPSTATUS, &status);
3121     crm_element_value_int(xml_op, XML_LRM_ATTR_INTERVAL, &interval);
3122 
3123     CRM_CHECK(task != NULL, return FALSE);
3124     CRM_CHECK(status <= PCMK_LRM_OP_NOT_INSTALLED, return FALSE);
3125     CRM_CHECK(status >= PCMK_LRM_OP_PENDING, return FALSE);
3126 
3127     if (safe_str_eq(task, CRMD_ACTION_NOTIFY) ||
3128         safe_str_eq(task, CRMD_ACTION_METADATA)) {
3129         /* safe to ignore these */
3130         return TRUE;
3131     }
3132 
3133     if (is_not_set(rsc->flags, pe_rsc_unique)) {
3134         parent = uber_parent(rsc);
3135     }
3136 
3137     pe_rsc_trace(rsc, "Unpacking task %s/%s (call_id=%d, status=%d, rc=%d) on %s (role=%s)",
3138                  task_key, task, task_id, status, rc, node->details->uname, role2text(rsc->role));
3139 
3140     if (node->details->unclean) {
3141         pe_rsc_trace(rsc, "Node %s (where %s is running) is unclean."
3142                      " Further action depends on the value of the stop's on-fail attribute",
3143                      node->details->uname, rsc->id);
3144     }
3145 
3146     if (status == PCMK_LRM_OP_ERROR) {
3147         /* Older versions set this if rc != 0 but it's up to us to decide */
3148         status = PCMK_LRM_OP_DONE;
3149     }
3150 
3151     if(status != PCMK_LRM_OP_NOT_INSTALLED) {
3152         expired = check_operation_expiry(rsc, node, rc, xml_op, data_set);
3153     }
3154 
3155     /* Degraded results are informational only, re-map them to their error-free equivalents */
3156     if (rc == PCMK_OCF_DEGRADED && safe_str_eq(task, CRMD_ACTION_STATUS)) {
3157         rc = PCMK_OCF_OK;
3158 
3159         /* Add them to the failed list to highlight them for the user */
3160         if ((node->details->shutdown == FALSE) || (node->details->online == TRUE)) {
3161             crm_trace("Remapping %d to %d", PCMK_OCF_DEGRADED, PCMK_OCF_OK);
3162             record_failed_op(xml_op, node, rsc, data_set);
3163         }
3164 
3165     } else if (rc == PCMK_OCF_DEGRADED_MASTER && safe_str_eq(task, CRMD_ACTION_STATUS)) {
3166         rc = PCMK_OCF_RUNNING_MASTER;
3167 
3168         /* Add them to the failed list to highlight them for the user */
3169         if ((node->details->shutdown == FALSE) || (node->details->online == TRUE)) {
3170             crm_trace("Remapping %d to %d", PCMK_OCF_DEGRADED_MASTER, PCMK_OCF_RUNNING_MASTER);
3171             record_failed_op(xml_op, node, rsc, data_set);
3172         }
3173     }
3174 
3175     if (expired && target_rc != rc) {
3176         const char *magic = crm_element_value(xml_op, XML_ATTR_TRANSITION_MAGIC);
3177 
3178         pe_rsc_debug(rsc, "Expired operation '%s' on %s returned '%s' (%d) instead of the expected value: '%s' (%d)",
3179                      key, node->details->uname,
3180                      services_ocf_exitcode_str(rc), rc,
3181                      services_ocf_exitcode_str(target_rc), target_rc);
3182 
3183         if(interval == 0) {
3184             crm_notice("Ignoring expired calculated failure %s (rc=%d, magic=%s) on %s",
3185                        task_key, rc, magic, node->details->uname);
3186             goto done;
3187 
3188         } else if(node->details->online && node->details->unclean == FALSE) {
3189             crm_notice("Re-initiated expired calculated failure %s (rc=%d, magic=%s) on %s",
3190                        task_key, rc, magic, node->details->uname);
3191             /* This is SO horrible, but we don't have access to CancelXmlOp() yet */
3192             crm_xml_add(xml_op, XML_LRM_ATTR_RESTART_DIGEST, "calculated-failure-timeout");
3193             goto done;
3194         }
3195     }
3196 
3197     if(status == PCMK_LRM_OP_DONE || status == PCMK_LRM_OP_ERROR) {
3198         status = determine_op_status(rsc, rc, target_rc, node, xml_op, on_fail, data_set);
3199     }
3200 
3201     pe_rsc_trace(rsc, "Handling status: %d", status);
3202     switch (status) {
3203         case PCMK_LRM_OP_CANCELLED:
3204             /* do nothing?? */
3205             pe_err("Don't know what to do for cancelled ops yet");
3206             break;
3207 
3208         case PCMK_LRM_OP_PENDING:
3209             if (safe_str_eq(task, CRMD_ACTION_START)) {
3210                 set_bit(rsc->flags, pe_rsc_start_pending);
3211                 set_active(rsc);
3212 
3213             } else if (safe_str_eq(task, CRMD_ACTION_PROMOTE)) {
3214                 rsc->role = RSC_ROLE_MASTER;
3215 
3216             } else if (safe_str_eq(task, CRMD_ACTION_MIGRATE) && node->details->unclean) {
3217                 /* If a pending migrate_to action is out on a unclean node,
3218                  * we have to force the stop action on the target. */
3219                 const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
3220                 node_t *target = pe_find_node(data_set->nodes, migrate_target);
3221                 if (target) {
3222                     stop_action(rsc, target, FALSE);
3223                 }
3224             }
3225 
3226             if (rsc->pending_task == NULL) {
3227                 if (safe_str_eq(task, CRMD_ACTION_STATUS) && interval == 0) {
3228                     /* Pending probes are not printed, even if pending
3229                      * operations are requested. If someone ever requests that
3230                      * behavior, uncomment this and the corresponding part of
3231                      * native.c:native_pending_task().
3232                      */
3233                     /*rsc->pending_task = strdup("probe");*/
3234 
3235                 } else {
3236                     rsc->pending_task = strdup(task);
3237                 }
3238             }
3239             break;
3240 
3241         case PCMK_LRM_OP_DONE:
3242             pe_rsc_trace(rsc, "%s/%s completed on %s", rsc->id, task, node->details->uname);
3243             update_resource_state(rsc, node, xml_op, task, rc, *last_failure, on_fail, data_set);
3244             break;
3245 
3246         case PCMK_LRM_OP_NOT_INSTALLED:
3247             failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
3248             if (failure_strategy == action_fail_ignore) {
3249                 crm_warn("Cannot ignore failed %s (status=%d, rc=%d) on %s: "
3250                          "Resource agent doesn't exist",
3251                          task_key, status, rc, node->details->uname);
3252                 /* Also for printing it as "FAILED" by marking it as pe_rsc_failed later */
3253                 *on_fail = action_fail_migrate;
3254             }
3255             resource_location(parent, node, -INFINITY, "hard-error", data_set);
3256             unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail, data_set);
3257             break;
3258 
3259         case PCMK_LRM_OP_ERROR:
3260         case PCMK_LRM_OP_ERROR_HARD:
3261         case PCMK_LRM_OP_ERROR_FATAL:
3262         case PCMK_LRM_OP_TIMEOUT:
3263         case PCMK_LRM_OP_NOTSUPPORTED:
3264 
3265             failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
3266             if ((failure_strategy == action_fail_ignore)
3267                 || (failure_strategy == action_fail_restart_container
3268                     && safe_str_eq(task, CRMD_ACTION_STOP))) {
3269 
3270                 crm_warn("Pretending the failure of %s (rc=%d) on %s succeeded",
3271                          task_key, rc, node->details->uname);
3272 
3273                 update_resource_state(rsc, node, xml_op, task, target_rc, *last_failure, on_fail, data_set);
3274                 crm_xml_add(xml_op, XML_ATTR_UNAME, node->details->uname);
3275                 set_bit(rsc->flags, pe_rsc_failure_ignored);
3276 
3277                 record_failed_op(xml_op, node, rsc, data_set);
3278 
3279                 if (failure_strategy == action_fail_restart_container && *on_fail <= action_fail_recover) {
3280                     *on_fail = failure_strategy;
3281                 }
3282 
3283             } else {
3284                 unpack_rsc_op_failure(rsc, node, rc, xml_op, last_failure, on_fail, data_set);
3285 
3286                 if(status == PCMK_LRM_OP_ERROR_HARD) {
3287                     do_crm_log(rc != PCMK_OCF_NOT_INSTALLED?LOG_ERR:LOG_NOTICE,
3288                                "Preventing %s from re-starting on %s: operation %s failed '%s' (%d)",
3289                                parent->id, node->details->uname,
3290                                task, services_ocf_exitcode_str(rc), rc);
3291 
3292                     resource_location(parent, node, -INFINITY, "hard-error", data_set);
3293 
3294                 } else if(status == PCMK_LRM_OP_ERROR_FATAL) {
3295                     crm_err("Preventing %s from re-starting anywhere: operation %s failed '%s' (%d)",
3296                             parent->id, task, services_ocf_exitcode_str(rc), rc);
3297 
3298                     resource_location(parent, NULL, -INFINITY, "fatal-error", data_set);
3299                 }
3300             }
3301             break;
3302     }
3303 
3304   done:
3305     pe_rsc_trace(rsc, "Resource %s after %s: role=%s, next=%s", rsc->id, task, role2text(rsc->role), role2text(rsc->next_role));
3306     return TRUE;
3307 }
3308 
3309 gboolean
3310 add_node_attrs(xmlNode * xml_obj, node_t * node, gboolean overwrite, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
3311 {
3312     const char *cluster_name = NULL;
3313 
3314     g_hash_table_insert(node->details->attrs,
3315                         strdup(CRM_ATTR_UNAME), strdup(node->details->uname));
3316 
3317     g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_ID),
3318                         strdup(node->details->id));
3319     if (safe_str_eq(node->details->id, data_set->dc_uuid)) {
3320         data_set->dc_node = node;
3321         node->details->is_dc = TRUE;
3322         g_hash_table_insert(node->details->attrs,
3323                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_TRUE));
3324     } else {
3325         g_hash_table_insert(node->details->attrs,
3326                             strdup(CRM_ATTR_IS_DC), strdup(XML_BOOLEAN_FALSE));
3327     }
3328 
3329     cluster_name = g_hash_table_lookup(data_set->config_hash, "cluster-name");
3330     if (cluster_name) {
3331         g_hash_table_insert(node->details->attrs, strdup(CRM_ATTR_CLUSTER_NAME),
3332                             strdup(cluster_name));
3333     }
3334 
3335     unpack_instance_attributes(data_set->input, xml_obj, XML_TAG_ATTR_SETS, NULL,
3336                                node->details->attrs, NULL, overwrite, data_set->now);
3337 
3338     if (pe_node_attribute_raw(node, CRM_ATTR_SITE_NAME) == NULL) {
3339         const char *site_name = pe_node_attribute_raw(node, "site-name");
3340 
3341         if (site_name) {
3342             g_hash_table_insert(node->details->attrs,
3343                                 strdup(CRM_ATTR_SITE_NAME),
3344                                 strdup(site_name));
3345 
3346         } else if (cluster_name) {
3347             /* Default to cluster-name if unset */
3348             g_hash_table_insert(node->details->attrs,
3349                                 strdup(CRM_ATTR_SITE_NAME),
3350                                 strdup(cluster_name));
3351         }
3352     }
3353     return TRUE;
3354 }
3355 
3356 static GListPtr
3357 extract_operations(const char *node, const char *rsc, xmlNode * rsc_entry, gboolean active_filter)
     /* [previous][next][first][last][top][bottom][index][help] */
3358 {
3359     int counter = -1;
3360     int stop_index = -1;
3361     int start_index = -1;
3362 
3363     xmlNode *rsc_op = NULL;
3364 
3365     GListPtr gIter = NULL;
3366     GListPtr op_list = NULL;
3367     GListPtr sorted_op_list = NULL;
3368 
3369     /* extract operations */
3370     op_list = NULL;
3371     sorted_op_list = NULL;
3372 
3373     for (rsc_op = __xml_first_child(rsc_entry); rsc_op != NULL; rsc_op = __xml_next_element(rsc_op)) {
3374         if (crm_str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP, TRUE)) {
3375             crm_xml_add(rsc_op, "resource", rsc);
3376             crm_xml_add(rsc_op, XML_ATTR_UNAME, node);
3377             op_list = g_list_prepend(op_list, rsc_op);
3378         }
3379     }
3380 
3381     if (op_list == NULL) {
3382         /* if there are no operations, there is nothing to do */
3383         return NULL;
3384     }
3385 
3386     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
3387 
3388     /* create active recurring operations as optional */
3389     if (active_filter == FALSE) {
3390         return sorted_op_list;
3391     }
3392 
3393     op_list = NULL;
3394 
3395     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
3396 
3397     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
3398         xmlNode *rsc_op = (xmlNode *) gIter->data;
3399 
3400         counter++;
3401 
3402         if (start_index < stop_index) {
3403             crm_trace("Skipping %s: not active", ID(rsc_entry));
3404             break;
3405 
3406         } else if (counter < start_index) {
3407             crm_trace("Skipping %s: old", ID(rsc_op));
3408             continue;
3409         }
3410         op_list = g_list_append(op_list, rsc_op);
3411     }
3412 
3413     g_list_free(sorted_op_list);
3414     return op_list;
3415 }
3416 
3417 GListPtr
3418 find_operations(const char *rsc, const char *node, gboolean active_filter,
     /* [previous][next][first][last][top][bottom][index][help] */
3419                 pe_working_set_t * data_set)
3420 {
3421     GListPtr output = NULL;
3422     GListPtr intermediate = NULL;
3423 
3424     xmlNode *tmp = NULL;
3425     xmlNode *status = find_xml_node(data_set->input, XML_CIB_TAG_STATUS, TRUE);
3426 
3427     node_t *this_node = NULL;
3428 
3429     xmlNode *node_state = NULL;
3430 
3431     for (node_state = __xml_first_child(status); node_state != NULL;
3432          node_state = __xml_next_element(node_state)) {
3433 
3434         if (crm_str_eq((const char *)node_state->name, XML_CIB_TAG_STATE, TRUE)) {
3435             const char *uname = crm_element_value(node_state, XML_ATTR_UNAME);
3436 
3437             if (node != NULL && safe_str_neq(uname, node)) {
3438                 continue;
3439             }
3440 
3441             this_node = pe_find_node(data_set->nodes, uname);
3442             if(this_node == NULL) {
3443                 CRM_LOG_ASSERT(this_node != NULL);
3444                 continue;
3445 
3446             } else if (is_remote_node(this_node)) {
3447                 determine_remote_online_status(data_set, this_node);
3448 
3449             } else {
3450                 determine_online_status(node_state, this_node, data_set);
3451             }
3452 
3453             if (this_node->details->online || is_set(data_set->flags, pe_flag_stonith_enabled)) {
3454                 /* offline nodes run no resources...
3455                  * unless stonith is enabled in which case we need to
3456                  *   make sure rsc start events happen after the stonith
3457                  */
3458                 xmlNode *lrm_rsc = NULL;
3459 
3460                 tmp = find_xml_node(node_state, XML_CIB_TAG_LRM, FALSE);
3461                 tmp = find_xml_node(tmp, XML_LRM_TAG_RESOURCES, FALSE);
3462 
3463                 for (lrm_rsc = __xml_first_child(tmp); lrm_rsc != NULL;
3464                      lrm_rsc = __xml_next_element(lrm_rsc)) {
3465                     if (crm_str_eq((const char *)lrm_rsc->name, XML_LRM_TAG_RESOURCE, TRUE)) {
3466 
3467                         const char *rsc_id = crm_element_value(lrm_rsc, XML_ATTR_ID);
3468 
3469                         if (rsc != NULL && safe_str_neq(rsc_id, rsc)) {
3470                             continue;
3471                         }
3472 
3473                         intermediate = extract_operations(uname, rsc_id, lrm_rsc, active_filter);
3474                         output = g_list_concat(output, intermediate);
3475                     }
3476                 }
3477             }
3478         }
3479     }
3480 
3481     return output;
3482 }

/* [previous][next][first][last][top][bottom][index][help] */