root/lib/pacemaker/pcmk_sched_ordering.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. invert_action
  2. get_ordering_type
  3. get_ordering_symmetry
  4. ordering_flags_for_kind
  5. get_ordering_resource
  6. get_minimum_first_instances
  7. clone_min_ordering
  8. inverse_ordering
  9. unpack_simple_rsc_order
  10. task_from_action_or_key
  11. handle_migration_ordering
  12. pcmk__new_ordering
  13. unpack_order_set
  14. order_rsc_sets
  15. unpack_order_tags
  16. pcmk__unpack_ordering
  17. ordering_is_invalid
  18. pcmk__disable_invalid_orderings
  19. pcmk__order_stops_before_shutdown
  20. find_actions_by_task
  21. rsc_order_then
  22. rsc_order_first
  23. pcmk__apply_orderings
  24. pcmk__order_after_each

   1 /*
   2  * Copyright 2004-2022 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU General Public License version 2
   7  * or later (GPLv2+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <stdbool.h>
  13 #include <glib.h>
  14 
  15 #include <crm/crm.h>
  16 #include <pacemaker-internal.h>
  17 #include "libpacemaker_private.h"
  18 
  19 enum pe_order_kind {
  20     pe_order_kind_optional,
  21     pe_order_kind_mandatory,
  22     pe_order_kind_serialize,
  23 };
  24 
  25 enum ordering_symmetry {
  26     ordering_asymmetric,        // the only relation in an asymmetric ordering
  27     ordering_symmetric,         // the normal relation in a symmetric ordering
  28     ordering_symmetric_inverse, // the inverse relation in a symmetric ordering
  29 };
  30 
  31 #define EXPAND_CONSTRAINT_IDREF(__set, __rsc, __name) do {                      \
  32         __rsc = pcmk__find_constraint_resource(data_set->resources, __name);    \
  33         if (__rsc == NULL) {                                                    \
  34             pcmk__config_err("%s: No resource found for %s", __set, __name);    \
  35             return pcmk_rc_schema_validation;                                   \
  36         }                                                                       \
  37     } while (0)
  38 
  39 static const char *
  40 invert_action(const char *action)
     /* [previous][next][first][last][top][bottom][index][help] */
  41 {
  42     if (pcmk__str_eq(action, RSC_START, pcmk__str_casei)) {
  43         return RSC_STOP;
  44 
  45     } else if (pcmk__str_eq(action, RSC_STOP, pcmk__str_casei)) {
  46         return RSC_START;
  47 
  48     } else if (pcmk__str_eq(action, RSC_PROMOTE, pcmk__str_casei)) {
  49         return RSC_DEMOTE;
  50 
  51     } else if (pcmk__str_eq(action, RSC_DEMOTE, pcmk__str_casei)) {
  52         return RSC_PROMOTE;
  53 
  54     } else if (pcmk__str_eq(action, RSC_PROMOTED, pcmk__str_casei)) {
  55         return RSC_DEMOTED;
  56 
  57     } else if (pcmk__str_eq(action, RSC_DEMOTED, pcmk__str_casei)) {
  58         return RSC_PROMOTED;
  59 
  60     } else if (pcmk__str_eq(action, RSC_STARTED, pcmk__str_casei)) {
  61         return RSC_STOPPED;
  62 
  63     } else if (pcmk__str_eq(action, RSC_STOPPED, pcmk__str_casei)) {
  64         return RSC_STARTED;
  65     }
  66     crm_warn("Unknown action '%s' specified in order constraint", action);
  67     return NULL;
  68 }
  69 
  70 static enum pe_order_kind
  71 get_ordering_type(xmlNode *xml_obj)
     /* [previous][next][first][last][top][bottom][index][help] */
  72 {
  73     enum pe_order_kind kind_e = pe_order_kind_mandatory;
  74     const char *kind = crm_element_value(xml_obj, XML_ORDER_ATTR_KIND);
  75 
  76     if (kind == NULL) {
  77         const char *score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
  78 
  79         kind_e = pe_order_kind_mandatory;
  80 
  81         if (score) {
  82             // @COMPAT deprecated informally since 1.0.7, formally since 2.0.1
  83             int score_i = char2score(score);
  84 
  85             if (score_i == 0) {
  86                 kind_e = pe_order_kind_optional;
  87             }
  88             pe_warn_once(pe_wo_order_score,
  89                          "Support for 'score' in rsc_order is deprecated "
  90                          "and will be removed in a future release "
  91                          "(use 'kind' instead)");
  92         }
  93 
  94     } else if (pcmk__str_eq(kind, "Mandatory", pcmk__str_casei)) {
  95         kind_e = pe_order_kind_mandatory;
  96 
  97     } else if (pcmk__str_eq(kind, "Optional", pcmk__str_casei)) {
  98         kind_e = pe_order_kind_optional;
  99 
 100     } else if (pcmk__str_eq(kind, "Serialize", pcmk__str_casei)) {
 101         kind_e = pe_order_kind_serialize;
 102 
 103     } else {
 104         pcmk__config_err("Resetting '" XML_ORDER_ATTR_KIND "' for constraint "
 105                          "'%s' to Mandatory because '%s' is not valid",
 106                          crm_str(ID(xml_obj)), kind);
 107     }
 108     return kind_e;
 109 }
 110 
 111 /*!
 112  * \internal
 113  * \brief Get ordering symmetry from XML
 114  *
 115  * \param[in] xml_obj               Ordering XML
 116  * \param[in] parent_kind           Default ordering kind
 117  * \param[in] parent_symmetrical_s  Parent element's symmetrical setting, if any
 118  *
 119  * \retval ordering_symmetric   Ordering is symmetric
 120  * \retval ordering_asymmetric  Ordering is asymmetric
 121  */
 122 static enum ordering_symmetry
 123 get_ordering_symmetry(xmlNode *xml_obj, enum pe_order_kind parent_kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 124                       const char *parent_symmetrical_s)
 125 {
 126     int rc = pcmk_rc_ok;
 127     bool symmetric = false;
 128     enum pe_order_kind kind = parent_kind; // Default to parent's kind
 129 
 130     // Check ordering XML for explicit kind
 131     if ((crm_element_value(xml_obj, XML_ORDER_ATTR_KIND) != NULL)
 132         || (crm_element_value(xml_obj, XML_RULE_ATTR_SCORE) != NULL)) {
 133         kind = get_ordering_type(xml_obj);
 134     }
 135 
 136     // Check ordering XML (and parent) for explicit symmetrical setting
 137     rc = pcmk__xe_get_bool_attr(xml_obj, XML_CONS_ATTR_SYMMETRICAL, &symmetric);
 138 
 139     if (rc != pcmk_rc_ok && parent_symmetrical_s != NULL) {
 140         symmetric = crm_is_true(parent_symmetrical_s);
 141         rc = pcmk_rc_ok;
 142     }
 143 
 144     if (rc == pcmk_rc_ok) {
 145         if (symmetric) {
 146             if (kind == pe_order_kind_serialize) {
 147                 pcmk__config_warn("Ignoring " XML_CONS_ATTR_SYMMETRICAL
 148                                   " for '%s' because not valid with "
 149                                   XML_ORDER_ATTR_KIND " of 'Serialize'",
 150                                   ID(xml_obj));
 151             } else {
 152                 return ordering_symmetric;
 153             }
 154         }
 155         return ordering_asymmetric;
 156     }
 157 
 158     // Use default symmetry
 159     if (kind == pe_order_kind_serialize) {
 160         return ordering_asymmetric;
 161     }
 162     return ordering_symmetric;
 163 }
 164 
 165 /*!
 166  * \internal
 167  * \brief Get ordering flags appropriate to ordering kind
 168  *
 169  * \param[in] kind      Ordering kind
 170  * \param[in] first     Action name for 'first' action
 171  * \param[in] symmetry  This ordering's symmetry role
 172  *
 173  * \return Minimal ordering flags appropriate to \p kind
 174  */
 175 static enum pe_ordering
 176 ordering_flags_for_kind(enum pe_order_kind kind, const char *first,
     /* [previous][next][first][last][top][bottom][index][help] */
 177                         enum ordering_symmetry symmetry)
 178 {
 179     enum pe_ordering flags = pe_order_none; // so we trace-log all flags set
 180 
 181     pe__set_order_flags(flags, pe_order_optional);
 182 
 183     switch (kind) {
 184         case pe_order_kind_optional:
 185             break;
 186 
 187         case pe_order_kind_serialize:
 188             pe__set_order_flags(flags, pe_order_serialize_only);
 189             break;
 190 
 191         case pe_order_kind_mandatory:
 192             switch (symmetry) {
 193                 case ordering_asymmetric:
 194                     pe__set_order_flags(flags, pe_order_asymmetrical);
 195                     break;
 196 
 197                 case ordering_symmetric:
 198                     pe__set_order_flags(flags, pe_order_implies_then);
 199                     if (pcmk__strcase_any_of(first, RSC_START, RSC_PROMOTE,
 200                                              NULL)) {
 201                         pe__set_order_flags(flags, pe_order_runnable_left);
 202                     }
 203                     break;
 204 
 205                 case ordering_symmetric_inverse:
 206                     pe__set_order_flags(flags, pe_order_implies_first);
 207                     break;
 208             }
 209             break;
 210     }
 211     return flags;
 212 }
 213 
 214 /*!
 215  * \internal
 216  * \brief Find resource corresponding to ID specified in ordering
 217  *
 218  * \param[in] xml            Ordering XML
 219  * \param[in] resource_attr  XML attribute name for resource ID
 220  * \param[in] instance_attr  XML attribute name for instance number
 221  * \param[in] data_set       Cluster working set
 222  *
 223  * \return Resource corresponding to \p id, or NULL if none
 224  */
 225 static pe_resource_t *
 226 get_ordering_resource(xmlNode *xml, const char *resource_attr,
     /* [previous][next][first][last][top][bottom][index][help] */
 227                       const char *instance_attr, pe_working_set_t *data_set)
 228 {
 229     pe_resource_t *rsc = NULL;
 230     const char *rsc_id = crm_element_value(xml, resource_attr);
 231     const char *instance_id = crm_element_value(xml, instance_attr);
 232 
 233     if (rsc_id == NULL) {
 234         pcmk__config_err("Ignoring constraint '%s' without %s",
 235                          ID(xml), resource_attr);
 236         return NULL;
 237     }
 238 
 239     rsc = pcmk__find_constraint_resource(data_set->resources, rsc_id);
 240     if (rsc == NULL) {
 241         pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 242                          "does not exist", ID(xml), rsc_id);
 243         return NULL;
 244     }
 245 
 246     if (instance_id != NULL) {
 247         if (!pe_rsc_is_clone(rsc)) {
 248             pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 249                              "is not a clone but instance '%s' was requested",
 250                              ID(xml), rsc_id, instance_id);
 251             return NULL;
 252         }
 253         rsc = find_clone_instance(rsc, instance_id, data_set);
 254         if (rsc == NULL) {
 255             pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 256                              "does not have an instance '%s'",
 257                              "'%s'", ID(xml), rsc_id, instance_id);
 258             return NULL;
 259         }
 260     }
 261     return rsc;
 262 }
 263 
 264 /*!
 265  * \internal
 266  * \brief Determine minimum number of 'first' instances required in ordering
 267  *
 268  * \param[in] rsc  'First' resource in ordering
 269  * \param[in] xml  Ordering XML
 270  *
 271  * \return Minimum 'first' instances required (or 0 if not applicable)
 272  */
 273 static int
 274 get_minimum_first_instances(pe_resource_t *rsc, xmlNode *xml)
     /* [previous][next][first][last][top][bottom][index][help] */
 275 {
 276     const char *clone_min = NULL;
 277     bool require_all = false;
 278 
 279     if (!pe_rsc_is_clone(rsc)) {
 280         return 0;
 281     }
 282 
 283     clone_min = g_hash_table_lookup(rsc->meta,
 284                                     XML_RSC_ATTR_INCARNATION_MIN);
 285     if (clone_min != NULL) {
 286         int clone_min_int = 0;
 287 
 288         pcmk__scan_min_int(clone_min, &clone_min_int, 0);
 289         return clone_min_int;
 290     }
 291 
 292     /* @COMPAT 1.1.13:
 293      * require-all=false is deprecated equivalent of clone-min=1
 294      */
 295     if (pcmk__xe_get_bool_attr(xml, "require-all", &require_all) != ENODATA) {
 296         pe_warn_once(pe_wo_require_all,
 297                      "Support for require-all in ordering constraints "
 298                      "is deprecated and will be removed in a future release"
 299                      " (use clone-min clone meta-attribute instead)");
 300         if (!require_all) {
 301             return 1;
 302         }
 303     }
 304 
 305     return 0;
 306 }
 307 
 308 /*!
 309  * \internal
 310  * \brief Create orderings for a constraint with clone-min > 0
 311  *
 312  * \param[in] id            Ordering ID
 313  * \param[in] rsc_first     'First' resource in ordering (a clone)
 314  * \param[in] action_first  'First' action in ordering
 315  * \param[in] rsc_then      'Then' resource in ordering
 316  * \param[in] action_then   'Then' action in ordering
 317  * \param[in] flags         Ordering flags
 318  * \param[in] clone_min     Minimum required instances of 'first'
 319  * \param[in] data_set      Cluster working set
 320  */
 321 static void
 322 clone_min_ordering(const char *id,
     /* [previous][next][first][last][top][bottom][index][help] */
 323                    pe_resource_t *rsc_first, const char *action_first,
 324                    pe_resource_t *rsc_then, const char *action_then,
 325                    enum pe_ordering flags, int clone_min,
 326                    pe_working_set_t *data_set)
 327 {
 328     // Create a pseudo-action for when the minimum instances are active
 329     char *task = crm_strdup_printf(CRM_OP_RELAXED_CLONE ":%s", id);
 330     pe_action_t *clone_min_met = get_pseudo_op(task, data_set);
 331 
 332     free(task);
 333 
 334     /* Require the pseudo-action to have the required number of actions to be
 335      * considered runnable before allowing the pseudo-action to be runnable.
 336      */
 337     clone_min_met->required_runnable_before = clone_min;
 338     pe__set_action_flags(clone_min_met, pe_action_requires_any);
 339 
 340     // Order the actions for each clone instance before the pseudo-action
 341     for (GList *rIter = rsc_first->children; rIter != NULL;
 342          rIter = rIter->next) {
 343 
 344         pe_resource_t *child = rIter->data;
 345 
 346         pcmk__new_ordering(child, pcmk__op_key(child->id, action_first, 0),
 347                            NULL, NULL, NULL, clone_min_met,
 348                            pe_order_one_or_more|pe_order_implies_then_printed,
 349                            data_set);
 350     }
 351 
 352     // Order "then" action after the pseudo-action (if runnable)
 353     pcmk__new_ordering(NULL, NULL, clone_min_met, rsc_then,
 354                        pcmk__op_key(rsc_then->id, action_then, 0),
 355                        NULL, flags|pe_order_runnable_left, data_set);
 356 }
 357 
 358 /*!
 359  * \internal
 360  * \brief Update ordering flags for restart-type=restart
 361  *
 362  * \param[in]  rsc    'Then' resource in ordering
 363  * \param[in]  kind   Ordering kind
 364  * \param[in]  flag   Ordering flag to set (when applicable)
 365  * \param[out] flags  Ordering flag set to update
 366  *
 367  * \compat The restart-type resource meta-attribute is deprecated. Eventually,
 368  *         it will be removed, and pe_restart_ignore will be the only behavior,
 369  *         at which time this can just be removed entirely.
 370  */
 371 #define handle_restart_type(rsc, kind, flag, flags) do {        \
 372         if (((kind) == pe_order_kind_optional)                  \
 373             && ((rsc)->restart_type == pe_restart_restart)) {   \
 374             pe__set_order_flags((flags), (flag));               \
 375         }                                                       \
 376     } while (0)
 377 
 378 /*!
 379  * \internal
 380  * \brief Create new ordering for inverse of symmetric constraint
 381  *
 382  * \param[in] id            Ordering ID (for logging only)
 383  * \param[in] kind          Ordering kind
 384  * \param[in] rsc_first     'First' resource in ordering (a clone)
 385  * \param[in] action_first  'First' action in ordering
 386  * \param[in] rsc_then      'Then' resource in ordering
 387  * \param[in] action_then   'Then' action in ordering
 388  * \param[in] data_set      Cluster working set
 389  */
 390 static void
 391 inverse_ordering(const char *id, enum pe_order_kind kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 392                  pe_resource_t *rsc_first, const char *action_first,
 393                  pe_resource_t *rsc_then, const char *action_then,
 394                  pe_working_set_t *data_set)
 395 {
 396     action_then = invert_action(action_then);
 397     action_first = invert_action(action_first);
 398     if ((action_then == NULL) || (action_first == NULL)) {
 399         pcmk__config_warn("Cannot invert constraint '%s' "
 400                           "(please specify inverse manually)", id);
 401     } else {
 402         enum pe_ordering flags = ordering_flags_for_kind(kind, action_first,
 403                                                          ordering_symmetric_inverse);
 404 
 405         handle_restart_type(rsc_then, kind, pe_order_implies_first, flags);
 406         pcmk__order_resource_actions(rsc_then, action_then, rsc_first,
 407                                      action_first, flags, data_set);
 408     }
 409 }
 410 
 411 static void
 412 unpack_simple_rsc_order(xmlNode *xml_obj, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 413 {
 414     pe_resource_t *rsc_then = NULL;
 415     pe_resource_t *rsc_first = NULL;
 416     int min_required_before = 0;
 417     enum pe_order_kind kind = pe_order_kind_mandatory;
 418     enum pe_ordering cons_weight = pe_order_none;
 419     enum ordering_symmetry symmetry;
 420 
 421     const char *action_then = NULL;
 422     const char *action_first = NULL;
 423     const char *id = NULL;
 424 
 425     CRM_CHECK(xml_obj != NULL, return);
 426 
 427     id = crm_element_value(xml_obj, XML_ATTR_ID);
 428     if (id == NULL) {
 429         pcmk__config_err("Ignoring <%s> constraint without " XML_ATTR_ID,
 430                          crm_element_name(xml_obj));
 431         return;
 432     }
 433 
 434     rsc_first = get_ordering_resource(xml_obj, XML_ORDER_ATTR_FIRST,
 435                                       XML_ORDER_ATTR_FIRST_INSTANCE,
 436                                       data_set);
 437     if (rsc_first == NULL) {
 438         return;
 439     }
 440 
 441     rsc_then = get_ordering_resource(xml_obj, XML_ORDER_ATTR_THEN,
 442                                      XML_ORDER_ATTR_THEN_INSTANCE,
 443                                      data_set);
 444     if (rsc_then == NULL) {
 445         return;
 446     }
 447 
 448     action_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST_ACTION);
 449     if (action_first == NULL) {
 450         action_first = RSC_START;
 451     }
 452 
 453     action_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN_ACTION);
 454     if (action_then == NULL) {
 455         action_then = action_first;
 456     }
 457 
 458     kind = get_ordering_type(xml_obj);
 459 
 460     symmetry = get_ordering_symmetry(xml_obj, kind, NULL);
 461     cons_weight = ordering_flags_for_kind(kind, action_first, symmetry);
 462 
 463     handle_restart_type(rsc_then, kind, pe_order_implies_then, cons_weight);
 464 
 465     /* If there is a minimum number of instances that must be runnable before
 466      * the 'then' action is runnable, we use a pseudo-action for convenience:
 467      * minimum number of clone instances have runnable actions ->
 468      * pseudo-action is runnable -> dependency is runnable.
 469      */
 470     min_required_before = get_minimum_first_instances(rsc_first, xml_obj);
 471     if (min_required_before > 0) {
 472         clone_min_ordering(id, rsc_first, action_first, rsc_then, action_then,
 473                            cons_weight, min_required_before, data_set);
 474     } else {
 475         pcmk__order_resource_actions(rsc_first, action_first, rsc_then,
 476                                      action_then, cons_weight, data_set);
 477     }
 478 
 479     if (symmetry == ordering_symmetric) {
 480         inverse_ordering(id, kind, rsc_first, action_first,
 481                          rsc_then, action_then, data_set);
 482     }
 483 }
 484 
 485 static char *
 486 task_from_action_or_key(pe_action_t *action, const char *key)
     /* [previous][next][first][last][top][bottom][index][help] */
 487 {
 488     char *res = NULL;
 489 
 490     if (action != NULL) {
 491         res = strdup(action->task);
 492     } else if (key != NULL) {
 493         parse_op_key(key, NULL, &res, NULL);
 494     }
 495     return res;
 496 }
 497 
 498 /*!
 499  * \internal
 500  * \brief Apply start/stop orderings to migrations
 501  *
 502  * Orderings involving start, stop, demote, and promote actions must be honored
 503  * during a migration as well, so duplicate any such ordering for the
 504  * corresponding migration actions.
 505  *
 506  * \param[in] order     Ordering constraint to check
 507  * \param[in] data_set  Cluster working set
 508  */
 509 static void
 510 handle_migration_ordering(pe__ordering_t *order, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 511 {
 512     char *lh_task = NULL;
 513     char *rh_task = NULL;
 514     bool rh_migratable;
 515     bool lh_migratable;
 516 
 517     // Only orderings between two different resources are relevant
 518     if ((order->lh_rsc == NULL) || (order->rh_rsc == NULL)
 519         || (order->lh_rsc == order->rh_rsc)) {
 520         return;
 521     }
 522 
 523     // Constraints between a parent resource and its children are not relevant
 524     if (is_parent(order->lh_rsc, order->rh_rsc)
 525         || is_parent(order->rh_rsc, order->lh_rsc)) {
 526         return;
 527     }
 528 
 529     // Only orderings involving at least one migratable resource are relevant
 530     lh_migratable = pcmk_is_set(order->lh_rsc->flags, pe_rsc_allow_migrate);
 531     rh_migratable = pcmk_is_set(order->rh_rsc->flags, pe_rsc_allow_migrate);
 532     if (!lh_migratable && !rh_migratable) {
 533         return;
 534     }
 535 
 536     // Check which actions are involved
 537     lh_task = task_from_action_or_key(order->lh_action, order->lh_action_task);
 538     rh_task = task_from_action_or_key(order->rh_action, order->rh_action_task);
 539     if ((lh_task == NULL) || (rh_task == NULL)) {
 540         goto cleanup_order;
 541     }
 542 
 543     if (pcmk__str_eq(lh_task, RSC_START, pcmk__str_casei)
 544         && pcmk__str_eq(rh_task, RSC_START, pcmk__str_casei)) {
 545 
 546         int flags = pe_order_optional;
 547 
 548         if (lh_migratable && rh_migratable) {
 549             /* A start then B start
 550              * -> A migrate_from then B migrate_to */
 551             pcmk__new_ordering(order->lh_rsc,
 552                                pcmk__op_key(order->lh_rsc->id, RSC_MIGRATED, 0),
 553                                NULL, order->rh_rsc,
 554                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 555                                NULL, flags, data_set);
 556         }
 557 
 558         if (rh_migratable) {
 559             if (lh_migratable) {
 560                 pe__set_order_flags(flags, pe_order_apply_first_non_migratable);
 561             }
 562 
 563             /* A start then B start
 564              * -> A start then B migrate_to (if start is not part of a
 565              *    migration)
 566              */
 567             pcmk__new_ordering(order->lh_rsc,
 568                                pcmk__op_key(order->lh_rsc->id, RSC_START, 0),
 569                                NULL, order->rh_rsc,
 570                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 571                                NULL, flags, data_set);
 572         }
 573 
 574     } else if (rh_migratable && pcmk__str_eq(lh_task, RSC_STOP, pcmk__str_casei)
 575                && pcmk__str_eq(rh_task, RSC_STOP, pcmk__str_casei)) {
 576 
 577         int flags = pe_order_optional;
 578 
 579         if (lh_migratable) {
 580             pe__set_order_flags(flags, pe_order_apply_first_non_migratable);
 581         }
 582 
 583         /* For an ordering "stop A then stop B", if A is moving via restart, and
 584          * B is migrating, enforce that B's migrate_to occurs after A's stop.
 585          */
 586         pcmk__new_ordering(order->lh_rsc,
 587                            pcmk__op_key(order->lh_rsc->id, RSC_STOP, 0), NULL,
 588                            order->rh_rsc,
 589                            pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 590                            NULL, flags, data_set);
 591 
 592         // Also order B's migrate_from after A's stop during partial migrations
 593         if (order->rh_rsc->partial_migration_target) {
 594             pcmk__new_ordering(order->lh_rsc,
 595                                pcmk__op_key(order->lh_rsc->id, RSC_STOP, 0),
 596                                NULL, order->rh_rsc,
 597                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATED, 0),
 598                                NULL, flags, data_set);
 599         }
 600 
 601     } else if (pcmk__str_eq(lh_task, RSC_PROMOTE, pcmk__str_casei)
 602                && pcmk__str_eq(rh_task, RSC_START, pcmk__str_casei)) {
 603 
 604         int flags = pe_order_optional;
 605 
 606         if (rh_migratable) {
 607             /* A promote then B start
 608              * -> A promote then B migrate_to */
 609             pcmk__new_ordering(order->lh_rsc,
 610                                pcmk__op_key(order->lh_rsc->id, RSC_PROMOTE, 0),
 611                                NULL, order->rh_rsc,
 612                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 613                                NULL, flags, data_set);
 614         }
 615 
 616     } else if (pcmk__str_eq(lh_task, RSC_DEMOTE, pcmk__str_casei)
 617                && pcmk__str_eq(rh_task, RSC_STOP, pcmk__str_casei)) {
 618 
 619         int flags = pe_order_optional;
 620 
 621         if (rh_migratable) {
 622             /* A demote then B stop
 623              * -> A demote then B migrate_to */
 624             pcmk__new_ordering(order->lh_rsc,
 625                                pcmk__op_key(order->lh_rsc->id, RSC_DEMOTE, 0),
 626                                NULL, order->rh_rsc,
 627                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 628                                NULL, flags, data_set);
 629 
 630             // Also order B migrate_from after A demote during partial migrations
 631             if (order->rh_rsc->partial_migration_target) {
 632                 pcmk__new_ordering(order->lh_rsc,
 633                                    pcmk__op_key(order->lh_rsc->id, RSC_DEMOTE, 0),
 634                                    NULL, order->rh_rsc,
 635                                    pcmk__op_key(order->rh_rsc->id, RSC_MIGRATED, 0),
 636                                    NULL, flags, data_set);
 637             }
 638         }
 639     }
 640 
 641 cleanup_order:
 642     free(lh_task);
 643     free(rh_task);
 644 }
 645 
 646 /*!
 647  * \internal
 648  * \brief Create a new ordering between two actions
 649  *
 650  * \param[in] lh_rsc           Resource for 'first' action (if NULL and
 651  *                             \p lh_action is a resource action, that
 652  *                             resource will be used)
 653  * \param[in] lh_action_task   Action key for 'first' action (if NULL and
 654  *                             \p lh_action is not NULL, its UUID will be used)
 655  * \param[in] lh_action        'first' action (if NULL, \p lh_rsc and
 656  *                             \p lh_action_task must be set)
 657  *
 658  * \param[in] rh_rsc           Resource for 'then' action (if NULL and
 659  *                             \p rh_action is a resource action, that
 660  *                             resource will be used)
 661  * \param[in] rh_action_task   Action key for 'then' action (if NULL and
 662  *                             \p rh_action is not NULL, its UUID will be used)
 663  * \param[in] rh_action        'then' action (if NULL, \p rh_rsc and
 664  *                             \p rh_action_task must be set)
 665  *
 666  * \param[in] type             Flag set of enum pe_ordering
 667  * \param[in] data_set         Cluster working set to add ordering to
 668  *
 669  * \note This function takes ownership of lh_action_task and rh_action_task,
 670  *       which do not need to be freed by the caller.
 671  */
 672 void
 673 pcmk__new_ordering(pe_resource_t *lh_rsc, char *lh_action_task,
     /* [previous][next][first][last][top][bottom][index][help] */
 674                    pe_action_t *lh_action, pe_resource_t *rh_rsc,
 675                    char *rh_action_task, pe_action_t *rh_action,
 676                    enum pe_ordering type, pe_working_set_t *data_set)
 677 {
 678     pe__ordering_t *order = NULL;
 679 
 680     // One of action or resource must be specified for each side
 681     CRM_CHECK(((lh_action != NULL) || (lh_rsc != NULL))
 682               && ((rh_action != NULL) || (rh_rsc != NULL)),
 683               free(lh_action_task); free(rh_action_task); return);
 684 
 685     if ((lh_rsc == NULL) && (lh_action != NULL)) {
 686         lh_rsc = lh_action->rsc;
 687     }
 688     if ((rh_rsc == NULL) && (rh_action != NULL)) {
 689         rh_rsc = rh_action->rsc;
 690     }
 691 
 692     order = calloc(1, sizeof(pe__ordering_t));
 693     CRM_ASSERT(order != NULL);
 694 
 695     order->id = data_set->order_id++;
 696     order->type = type;
 697     order->lh_rsc = lh_rsc;
 698     order->rh_rsc = rh_rsc;
 699     order->lh_action = lh_action;
 700     order->rh_action = rh_action;
 701     order->lh_action_task = lh_action_task;
 702     order->rh_action_task = rh_action_task;
 703 
 704     if ((order->lh_action_task == NULL) && (lh_action != NULL)) {
 705         order->lh_action_task = strdup(lh_action->uuid);
 706     }
 707 
 708     if ((order->rh_action_task == NULL) && (rh_action != NULL)) {
 709         order->rh_action_task = strdup(rh_action->uuid);
 710     }
 711 
 712     if ((order->lh_rsc == NULL) && (lh_action != NULL)) {
 713         order->lh_rsc = lh_action->rsc;
 714     }
 715 
 716     if ((order->rh_rsc == NULL) && (rh_action != NULL)) {
 717         order->rh_rsc = rh_action->rsc;
 718     }
 719 
 720     pe_rsc_trace(lh_rsc, "Created ordering %d for %s then %s",
 721                  (data_set->order_id - 1),
 722                  ((lh_action_task == NULL)? "?" : lh_action_task),
 723                  ((rh_action_task == NULL)? "?" : rh_action_task));
 724 
 725     data_set->ordering_constraints = g_list_prepend(data_set->ordering_constraints,
 726                                                     order);
 727     handle_migration_ordering(order, data_set);
 728 }
 729 
 730 /*!
 731  * \brief Unpack a set in an ordering constraint
 732  *
 733  * \param[in]  set                    Set XML to unpack
 734  * \param[in]  parent_kind            rsc_order XML "kind" attribute
 735  * \param[in]  parent_symmetrical_s   rsc_order XML "symmetrical" attribute
 736  * \param[in]  data_set               Cluster working set
 737  *
 738  * \return Standard Pacemaker return code
 739  */
 740 static int
 741 unpack_order_set(xmlNode *set, enum pe_order_kind parent_kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 742                  const char *parent_symmetrical_s, pe_working_set_t *data_set)
 743 {
 744     xmlNode *xml_rsc = NULL;
 745     GList *set_iter = NULL;
 746     GList *resources = NULL;
 747 
 748     pe_resource_t *last = NULL;
 749     pe_resource_t *resource = NULL;
 750 
 751     int local_kind = parent_kind;
 752     bool sequential = false;
 753     enum pe_ordering flags = pe_order_optional;
 754     enum ordering_symmetry symmetry;
 755 
 756     char *key = NULL;
 757     const char *id = ID(set);
 758     const char *action = crm_element_value(set, "action");
 759     const char *sequential_s = crm_element_value(set, "sequential");
 760     const char *kind_s = crm_element_value(set, XML_ORDER_ATTR_KIND);
 761 
 762     if (action == NULL) {
 763         action = RSC_START;
 764     }
 765 
 766     if (kind_s) {
 767         local_kind = get_ordering_type(set);
 768     }
 769     if (sequential_s == NULL) {
 770         sequential_s = "1";
 771     }
 772 
 773     sequential = crm_is_true(sequential_s);
 774 
 775     symmetry = get_ordering_symmetry(set, parent_kind, parent_symmetrical_s);
 776     flags = ordering_flags_for_kind(local_kind, action, symmetry);
 777 
 778     for (xml_rsc = first_named_child(set, XML_TAG_RESOURCE_REF);
 779          xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 780 
 781         EXPAND_CONSTRAINT_IDREF(id, resource, ID(xml_rsc));
 782         resources = g_list_append(resources, resource);
 783     }
 784 
 785     if (pcmk__list_of_1(resources)) {
 786         crm_trace("Single set: %s", id);
 787         goto done;
 788     }
 789 
 790     set_iter = resources;
 791     while (set_iter != NULL) {
 792         resource = (pe_resource_t *) set_iter->data;
 793         set_iter = set_iter->next;
 794 
 795         key = pcmk__op_key(resource->id, action, 0);
 796 
 797         if (local_kind == pe_order_kind_serialize) {
 798             /* Serialize before everything that comes after */
 799 
 800             for (GList *gIter = set_iter; gIter != NULL; gIter = gIter->next) {
 801                 pe_resource_t *then_rsc = (pe_resource_t *) gIter->data;
 802                 char *then_key = pcmk__op_key(then_rsc->id, action, 0);
 803 
 804                 pcmk__new_ordering(resource, strdup(key), NULL, then_rsc,
 805                                    then_key, NULL, flags, data_set);
 806             }
 807 
 808         } else if (sequential) {
 809             if (last != NULL) {
 810                 pcmk__order_resource_actions(last, action, resource, action,
 811                                              flags, data_set);
 812             }
 813             last = resource;
 814         }
 815         free(key);
 816     }
 817 
 818     if (symmetry == ordering_asymmetric) {
 819         goto done;
 820     }
 821 
 822     last = NULL;
 823     action = invert_action(action);
 824 
 825     flags = ordering_flags_for_kind(local_kind, action,
 826                                     ordering_symmetric_inverse);
 827 
 828     set_iter = resources;
 829     while (set_iter != NULL) {
 830         resource = (pe_resource_t *) set_iter->data;
 831         set_iter = set_iter->next;
 832 
 833         if (sequential) {
 834             if (last != NULL) {
 835                 pcmk__order_resource_actions(resource, action, last, action,
 836                                              flags, data_set);
 837             }
 838             last = resource;
 839         }
 840     }
 841 
 842   done:
 843     g_list_free(resources);
 844     return pcmk_rc_ok;
 845 }
 846 
 847 /*!
 848  * \brief Order two resource sets relative to each other
 849  *
 850  * \param[in] id        Ordering ID (for logging)
 851  * \param[in] set1      First listed set
 852  * \param[in] set2      Second listed set
 853  * \param[in] kind      Ordering kind
 854  * \param[in] data_set  Cluster working set
 855  * \param[in] symmetry  Which ordering symmetry applies to this relation
 856  *
 857  * \return Standard Pacemaker return code
 858  */
 859 static int
 860 order_rsc_sets(const char *id, xmlNode *set1, xmlNode *set2,
     /* [previous][next][first][last][top][bottom][index][help] */
 861                enum pe_order_kind kind, pe_working_set_t *data_set,
 862                enum ordering_symmetry symmetry)
 863 {
 864 
 865     xmlNode *xml_rsc = NULL;
 866     xmlNode *xml_rsc_2 = NULL;
 867 
 868     pe_resource_t *rsc_1 = NULL;
 869     pe_resource_t *rsc_2 = NULL;
 870 
 871     const char *action_1 = crm_element_value(set1, "action");
 872     const char *action_2 = crm_element_value(set2, "action");
 873 
 874     enum pe_ordering flags = pe_order_none;
 875 
 876     bool require_all = true;
 877 
 878     pcmk__xe_get_bool_attr(set1, "require-all", &require_all);
 879 
 880     if (action_1 == NULL) {
 881         action_1 = RSC_START;
 882     }
 883 
 884     if (action_2 == NULL) {
 885         action_2 = RSC_START;
 886     }
 887 
 888     if (symmetry == ordering_symmetric_inverse) {
 889         action_1 = invert_action(action_1);
 890         action_2 = invert_action(action_2);
 891     }
 892 
 893     if (pcmk__str_eq(RSC_STOP, action_1, pcmk__str_casei)
 894         || pcmk__str_eq(RSC_DEMOTE, action_1, pcmk__str_casei)) {
 895         /* Assuming: A -> ( B || C) -> D
 896          * The one-or-more logic only applies during the start/promote phase.
 897          * During shutdown neither B nor can shutdown until D is down, so simply
 898          * turn require_all back on.
 899          */
 900         require_all = true;
 901     }
 902 
 903     // @TODO is action_2 correct here?
 904     flags = ordering_flags_for_kind(kind, action_2, symmetry);
 905 
 906     /* If we have an unordered set1, whether it is sequential or not is
 907      * irrelevant in regards to set2.
 908      */
 909     if (!require_all) {
 910         char *task = crm_strdup_printf(CRM_OP_RELAXED_SET ":%s", ID(set1));
 911         pe_action_t *unordered_action = get_pseudo_op(task, data_set);
 912 
 913         free(task);
 914         pe__set_action_flags(unordered_action, pe_action_requires_any);
 915 
 916         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 917              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 918 
 919             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 920 
 921             /* Add an ordering constraint between every element in set1 and the
 922              * pseudo action. If any action in set1 is runnable the pseudo
 923              * action will be runnable.
 924              */
 925             pcmk__new_ordering(rsc_1, pcmk__op_key(rsc_1->id, action_1, 0),
 926                                NULL, NULL, NULL, unordered_action,
 927                                pe_order_one_or_more|pe_order_implies_then_printed,
 928                                data_set);
 929         }
 930         for (xml_rsc_2 = first_named_child(set2, XML_TAG_RESOURCE_REF);
 931              xml_rsc_2 != NULL; xml_rsc_2 = crm_next_same_xml(xml_rsc_2)) {
 932 
 933             EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc_2));
 934 
 935             /* Add an ordering constraint between the pseudo-action and every
 936              * element in set2. If the pseudo-action is runnable, every action
 937              * in set2 will be runnable.
 938              */
 939             pcmk__new_ordering(NULL, NULL, unordered_action,
 940                                rsc_2, pcmk__op_key(rsc_2->id, action_2, 0),
 941                                NULL, flags|pe_order_runnable_left, data_set);
 942         }
 943 
 944         return pcmk_rc_ok;
 945     }
 946 
 947     if (pcmk__xe_attr_is_true(set1, "sequential")) {
 948         if (symmetry == ordering_symmetric_inverse) {
 949             // Get the first one
 950             xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 951             if (xml_rsc != NULL) {
 952                 EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 953             }
 954 
 955         } else {
 956             // Get the last one
 957             const char *rid = NULL;
 958 
 959             for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 960                  xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 961 
 962                 rid = ID(xml_rsc);
 963             }
 964             EXPAND_CONSTRAINT_IDREF(id, rsc_1, rid);
 965         }
 966     }
 967 
 968     if (pcmk__xe_attr_is_true(set2, "sequential")) {
 969         if (symmetry == ordering_symmetric_inverse) {
 970             // Get the last one
 971             const char *rid = NULL;
 972 
 973             for (xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 974                  xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 975 
 976                 rid = ID(xml_rsc);
 977             }
 978             EXPAND_CONSTRAINT_IDREF(id, rsc_2, rid);
 979 
 980         } else {
 981             // Get the first one
 982             xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 983             if (xml_rsc != NULL) {
 984                 EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc));
 985             }
 986         }
 987     }
 988 
 989     if ((rsc_1 != NULL) && (rsc_2 != NULL)) {
 990         pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2, flags,
 991                                      data_set);
 992 
 993     } else if (rsc_1 != NULL) {
 994         for (xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 995              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 996 
 997             EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc));
 998             pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2,
 999                                          flags, data_set);
1000         }
1001 
1002     } else if (rsc_2 != NULL) {
1003         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
1004              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
1005 
1006             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
1007             pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2,
1008                                          flags, data_set);
1009         }
1010 
1011     } else {
1012         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
1013              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
1014 
1015             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
1016 
1017             for (xmlNode *xml_rsc_2 = first_named_child(set2, XML_TAG_RESOURCE_REF);
1018                  xml_rsc_2 != NULL; xml_rsc_2 = crm_next_same_xml(xml_rsc_2)) {
1019 
1020                 EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc_2));
1021                 pcmk__order_resource_actions(rsc_1, action_1, rsc_2,
1022                                              action_2, flags, data_set);
1023             }
1024         }
1025     }
1026 
1027     return pcmk_rc_ok;
1028 }
1029 
1030 /*!
1031  * \internal
1032  * \brief If an ordering constraint uses resource tags, expand them
1033  *
1034  * \param[in]  xml_obj       Ordering constraint XML
1035  * \param[out] expanded_xml  Equivalent XML with tags expanded
1036  * \param[in]  data_set      Cluster working set
1037  *
1038  * \return Standard Pacemaker return code (specifically, pcmk_rc_ok on success,
1039  *         and pcmk_rc_schema_validation on invalid configuration)
1040  */
1041 static int
1042 unpack_order_tags(xmlNode *xml_obj, xmlNode **expanded_xml,
     /* [previous][next][first][last][top][bottom][index][help] */
1043                   pe_working_set_t *data_set)
1044 {
1045     const char *id_first = NULL;
1046     const char *id_then = NULL;
1047     const char *action_first = NULL;
1048     const char *action_then = NULL;
1049 
1050     pe_resource_t *rsc_first = NULL;
1051     pe_resource_t *rsc_then = NULL;
1052     pe_tag_t *tag_first = NULL;
1053     pe_tag_t *tag_then = NULL;
1054 
1055     xmlNode *rsc_set_first = NULL;
1056     xmlNode *rsc_set_then = NULL;
1057     bool any_sets = false;
1058 
1059     // Check whether there are any resource sets with template or tag references
1060     *expanded_xml = pcmk__expand_tags_in_sets(xml_obj, data_set);
1061     if (*expanded_xml != NULL) {
1062         crm_log_xml_trace(*expanded_xml, "Expanded rsc_order");
1063         return pcmk_rc_ok;
1064     }
1065 
1066     id_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST);
1067     id_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN);
1068     if ((id_first == NULL) || (id_then == NULL)) {
1069         return pcmk_rc_ok;
1070     }
1071 
1072     if (!pcmk__valid_resource_or_tag(data_set, id_first, &rsc_first,
1073                                      &tag_first)) {
1074         pcmk__config_err("Ignoring constraint '%s' because '%s' is not a "
1075                          "valid resource or tag", ID(xml_obj), id_first);
1076         return pcmk_rc_schema_validation;
1077     }
1078 
1079     if (!pcmk__valid_resource_or_tag(data_set, id_then, &rsc_then, &tag_then)) {
1080         pcmk__config_err("Ignoring constraint '%s' because '%s' is not a "
1081                          "valid resource or tag", ID(xml_obj), id_then);
1082         return pcmk_rc_schema_validation;
1083     }
1084 
1085     if ((rsc_first != NULL) && (rsc_then != NULL)) {
1086         // Neither side references a template or tag
1087         return pcmk_rc_ok;
1088     }
1089 
1090     action_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST_ACTION);
1091     action_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN_ACTION);
1092 
1093     *expanded_xml = copy_xml(xml_obj);
1094 
1095     // Convert template/tag reference in "first" into resource_set under constraint
1096     if (!pcmk__tag_to_set(*expanded_xml, &rsc_set_first, XML_ORDER_ATTR_FIRST,
1097                           true, data_set)) {
1098         free_xml(*expanded_xml);
1099         *expanded_xml = NULL;
1100         return pcmk_rc_schema_validation;
1101     }
1102 
1103     if (rsc_set_first != NULL) {
1104         if (action_first != NULL) {
1105             // Move "first-action" into converted resource_set as "action"
1106             crm_xml_add(rsc_set_first, "action", action_first);
1107             xml_remove_prop(*expanded_xml, XML_ORDER_ATTR_FIRST_ACTION);
1108         }
1109         any_sets = true;
1110     }
1111 
1112     // Convert template/tag reference in "then" into resource_set under constraint
1113     if (!pcmk__tag_to_set(*expanded_xml, &rsc_set_then, XML_ORDER_ATTR_THEN,
1114                           true, data_set)) {
1115         free_xml(*expanded_xml);
1116         *expanded_xml = NULL;
1117         return pcmk_rc_schema_validation;
1118     }
1119 
1120     if (rsc_set_then != NULL) {
1121         if (action_then != NULL) {
1122             // Move "then-action" into converted resource_set as "action"
1123             crm_xml_add(rsc_set_then, "action", action_then);
1124             xml_remove_prop(*expanded_xml, XML_ORDER_ATTR_THEN_ACTION);
1125         }
1126         any_sets = true;
1127     }
1128 
1129     if (any_sets) {
1130         crm_log_xml_trace(*expanded_xml, "Expanded rsc_order");
1131     } else {
1132         free_xml(*expanded_xml);
1133         *expanded_xml = NULL;
1134     }
1135 
1136     return pcmk_rc_ok;
1137 }
1138 
1139 /*!
1140  * \internal
1141  * \brief Unpack ordering constraint XML
1142  *
1143  * \param[in]     xml_obj   Ordering constraint XML to unpack
1144  * \param[in,out] data_set  Cluster working set
1145  */
1146 void
1147 pcmk__unpack_ordering(xmlNode *xml_obj, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1148 {
1149     xmlNode *set = NULL;
1150     xmlNode *last = NULL;
1151 
1152     xmlNode *orig_xml = NULL;
1153     xmlNode *expanded_xml = NULL;
1154 
1155     const char *id = crm_element_value(xml_obj, XML_ATTR_ID);
1156     const char *invert = crm_element_value(xml_obj, XML_CONS_ATTR_SYMMETRICAL);
1157     enum pe_order_kind kind = get_ordering_type(xml_obj);
1158 
1159     enum ordering_symmetry symmetry = get_ordering_symmetry(xml_obj, kind,
1160                                                             NULL);
1161 
1162     // Expand any resource tags in the constraint XML
1163     if (unpack_order_tags(xml_obj, &expanded_xml, data_set) != pcmk_rc_ok) {
1164         return;
1165     }
1166     if (expanded_xml != NULL) {
1167         orig_xml = xml_obj;
1168         xml_obj = expanded_xml;
1169     }
1170 
1171     // If the constraint has resource sets, unpack them
1172     for (set = first_named_child(xml_obj, XML_CONS_TAG_RSC_SET);
1173          set != NULL; set = crm_next_same_xml(set)) {
1174 
1175         set = expand_idref(set, data_set->input);
1176         if ((set == NULL) // Configuration error, message already logged
1177             || (unpack_order_set(set, kind, invert, data_set) != pcmk_rc_ok)) {
1178 
1179             if (expanded_xml != NULL) {
1180                 free_xml(expanded_xml);
1181             }
1182             return;
1183         }
1184 
1185         if (last != NULL) {
1186 
1187             if (order_rsc_sets(id, last, set, kind, data_set,
1188                                symmetry) != pcmk_rc_ok) {
1189                 if (expanded_xml != NULL) {
1190                     free_xml(expanded_xml);
1191                 }
1192                 return;
1193             }
1194 
1195             if ((symmetry == ordering_symmetric)
1196                 && (order_rsc_sets(id, set, last, kind, data_set,
1197                                    ordering_symmetric_inverse) != pcmk_rc_ok)) {
1198                 if (expanded_xml != NULL) {
1199                     free_xml(expanded_xml);
1200                 }
1201                 return;
1202             }
1203 
1204         }
1205         last = set;
1206     }
1207 
1208     if (expanded_xml) {
1209         free_xml(expanded_xml);
1210         xml_obj = orig_xml;
1211     }
1212 
1213     // If the constraint has no resource sets, unpack it as a simple ordering
1214     if (last == NULL) {
1215         return unpack_simple_rsc_order(xml_obj, data_set);
1216     }
1217 }
1218 
1219 static bool
1220 ordering_is_invalid(pe_action_t *action, pe_action_wrapper_t *input)
     /* [previous][next][first][last][top][bottom][index][help] */
1221 {
1222     /* Prevent user-defined ordering constraints between resources
1223      * running in a guest node and the resource that defines that node.
1224      */
1225     if (!pcmk_is_set(input->type, pe_order_preserve)
1226         && (input->action->rsc != NULL)
1227         && pcmk__rsc_corresponds_to_guest(action->rsc, input->action->node)) {
1228 
1229         crm_warn("Invalid ordering constraint between %s and %s",
1230                  input->action->rsc->id, action->rsc->id);
1231         return true;
1232     }
1233 
1234     /* If there's an order like
1235      * "rscB_stop node2"-> "load_stopped_node2" -> "rscA_migrate_to node1"
1236      *
1237      * then rscA is being migrated from node1 to node2, while rscB is being
1238      * migrated from node2 to node1. If there would be a graph loop,
1239      * break the order "load_stopped_node2" -> "rscA_migrate_to node1".
1240      */
1241     if ((input->type == pe_order_load) && action->rsc
1242         && pcmk__str_eq(action->task, RSC_MIGRATE, pcmk__str_casei)
1243         && pcmk__graph_has_loop(action, action, input)) {
1244         return true;
1245     }
1246 
1247     return false;
1248 }
1249 
1250 void
1251 pcmk__disable_invalid_orderings(pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1252 {
1253     for (GList *iter = data_set->actions; iter != NULL; iter = iter->next) {
1254         pe_action_t *action = (pe_action_t *) iter->data;
1255         pe_action_wrapper_t *input = NULL;
1256 
1257         for (GList *input_iter = action->actions_before;
1258              input_iter != NULL; input_iter = input_iter->next) {
1259 
1260             input = (pe_action_wrapper_t *) input_iter->data;
1261             if (ordering_is_invalid(action, input)) {
1262                 input->type = pe_order_none;
1263             }
1264         }
1265     }
1266 }
1267 
1268 /*!
1269  * \internal
1270  * \brief Order stops on a node before the node's shutdown
1271  *
1272  * \param[in] node         Node being shut down
1273  * \param[in] shutdown_op  Shutdown action for node
1274  * \param[in] data_set     Cluster working set
1275  */
1276 void
1277 pcmk__order_stops_before_shutdown(pe_node_t *node, pe_action_t *shutdown_op,
     /* [previous][next][first][last][top][bottom][index][help] */
1278                                   pe_working_set_t *data_set)
1279 {
1280     for (GList *iter = data_set->actions; iter != NULL; iter = iter->next) {
1281         pe_action_t *action = (pe_action_t *) iter->data;
1282 
1283         // Only stops on the node shutting down are relevant
1284         if ((action->rsc == NULL) || (action->node == NULL)
1285             || (action->node->details != node->details)
1286             || !pcmk__str_eq(action->task, RSC_STOP, pcmk__str_casei)) {
1287             continue;
1288         }
1289 
1290         // Resources and nodes in maintenance mode won't be touched
1291 
1292         if (pcmk_is_set(action->rsc->flags, pe_rsc_maintenance)) {
1293             pe_rsc_trace(action->rsc,
1294                          "Not ordering %s before %s shutdown because "
1295                          "resource in maintenance mode",
1296                          action->uuid, node->details->uname);
1297             continue;
1298 
1299         } else if (node->details->maintenance) {
1300             pe_rsc_trace(action->rsc,
1301                          "Not ordering %s before %s shutdown because "
1302                          "node in maintenance mode",
1303                          action->uuid, node->details->uname);
1304             continue;
1305         }
1306 
1307         /* Don't touch a resource that is unmanaged or blocked, to avoid
1308          * blocking the shutdown (though if another action depends on this one,
1309          * we may still end up blocking)
1310          */
1311         if (!pcmk_any_flags_set(action->rsc->flags,
1312                                 pe_rsc_managed|pe_rsc_block)) {
1313             pe_rsc_trace(action->rsc,
1314                          "Not ordering %s before %s shutdown because "
1315                          "resource is unmanaged or blocked",
1316                          action->uuid, node->details->uname);
1317             continue;
1318         }
1319 
1320         pe_rsc_trace(action->rsc, "Ordering %s before %s shutdown",
1321                      action->uuid, node->details->uname);
1322         pe__clear_action_flags(action, pe_action_optional);
1323         pcmk__new_ordering(action->rsc, NULL, action, NULL,
1324                            strdup(CRM_OP_SHUTDOWN), shutdown_op,
1325                            pe_order_optional|pe_order_runnable_left, data_set);
1326     }
1327 }
1328 
1329 /*!
1330  * \brief Find resource actions matching directly or as child
1331  *
1332  * \param[in] rsc           Resource to check
1333  * \param[in] original_key  Action key to search for (possibly referencing
1334  *                          parent of \rsc)
1335  *
1336  * \return Newly allocated list of matching actions
1337  * \note It is the caller's responsibility to free the result with g_list_free()
1338  */
1339 static GList *
1340 find_actions_by_task(pe_resource_t *rsc, const char *original_key)
     /* [previous][next][first][last][top][bottom][index][help] */
1341 {
1342     // Search under given task key directly
1343     GList *list = find_actions(rsc->actions, original_key, NULL);
1344 
1345     if (list == NULL) {
1346         // Search again using this resource's ID
1347         char *key = NULL;
1348         char *task = NULL;
1349         guint interval_ms = 0;
1350 
1351         if (parse_op_key(original_key, NULL, &task, &interval_ms)) {
1352             key = pcmk__op_key(rsc->id, task, interval_ms);
1353             list = find_actions(rsc->actions, key, NULL);
1354             free(key);
1355             free(task);
1356         } else {
1357             crm_err("Invalid operation key (bug?): %s", original_key);
1358         }
1359     }
1360     return list;
1361 }
1362 
1363 static void
1364 rsc_order_then(pe_action_t *lh_action, pe_resource_t *rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
1365                pe__ordering_t *order)
1366 {
1367     GList *rh_actions = NULL;
1368     pe_action_t *rh_action = NULL;
1369     enum pe_ordering type;
1370 
1371     CRM_CHECK(rsc != NULL, return);
1372     CRM_CHECK(order != NULL, return);
1373 
1374     type = order->type;
1375     rh_action = order->rh_action;
1376     crm_trace("Applying ordering constraint %d (then: %s)", order->id, rsc->id);
1377 
1378     if (rh_action != NULL) {
1379         rh_actions = g_list_prepend(NULL, rh_action);
1380 
1381     } else if (rsc != NULL) {
1382         rh_actions = find_actions_by_task(rsc, order->rh_action_task);
1383     }
1384 
1385     if (rh_actions == NULL) {
1386         pe_rsc_trace(rsc,
1387                      "Ignoring constraint %d: then (%s for %s) not found",
1388                      order->id, order->rh_action_task, rsc->id);
1389         return;
1390     }
1391 
1392     if ((lh_action != NULL) && (lh_action->rsc == rsc)
1393         && pcmk_is_set(lh_action->flags, pe_action_dangle)) {
1394 
1395         pe_rsc_trace(rsc, "Detected dangling operation %s -> %s",
1396                      lh_action->uuid, order->rh_action_task);
1397         pe__clear_order_flags(type, pe_order_implies_then);
1398     }
1399 
1400     for (GList *gIter = rh_actions; gIter != NULL; gIter = gIter->next) {
1401         pe_action_t *rh_action_iter = (pe_action_t *) gIter->data;
1402 
1403         if (lh_action) {
1404             order_actions(lh_action, rh_action_iter, type);
1405 
1406         } else if (type & pe_order_implies_then) {
1407             pe__clear_action_flags(rh_action_iter, pe_action_runnable);
1408             crm_warn("Unrunnable %s %#.6x", rh_action_iter->uuid, type);
1409         } else {
1410             crm_warn("neither %s %#.6x", rh_action_iter->uuid, type);
1411         }
1412     }
1413 
1414     g_list_free(rh_actions);
1415 }
1416 
1417 static void
1418 rsc_order_first(pe_resource_t *lh_rsc, pe__ordering_t *order,
     /* [previous][next][first][last][top][bottom][index][help] */
1419                 pe_working_set_t *data_set)
1420 {
1421     GList *lh_actions = NULL;
1422     pe_action_t *lh_action = order->lh_action;
1423     pe_resource_t *rh_rsc = order->rh_rsc;
1424 
1425     CRM_ASSERT(lh_rsc != NULL);
1426     pe_rsc_trace(lh_rsc, "Applying ordering constraint %d (first: %s)",
1427                  order->id, lh_rsc->id);
1428 
1429     if (lh_action != NULL) {
1430         lh_actions = g_list_prepend(NULL, lh_action);
1431 
1432     } else {
1433         lh_actions = find_actions_by_task(lh_rsc, order->lh_action_task);
1434     }
1435 
1436     if ((lh_actions == NULL) && (lh_rsc == rh_rsc)) {
1437         pe_rsc_trace(lh_rsc,
1438                      "Ignoring constraint %d: first (%s for %s) not found",
1439                      order->id, order->lh_action_task, lh_rsc->id);
1440 
1441     } else if (lh_actions == NULL) {
1442         char *key = NULL;
1443         char *op_type = NULL;
1444         guint interval_ms = 0;
1445 
1446         parse_op_key(order->lh_action_task, NULL, &op_type, &interval_ms);
1447         key = pcmk__op_key(lh_rsc->id, op_type, interval_ms);
1448 
1449         if ((lh_rsc->fns->state(lh_rsc, TRUE) == RSC_ROLE_STOPPED)
1450             && pcmk__str_eq(op_type, RSC_STOP, pcmk__str_casei)) {
1451             free(key);
1452             pe_rsc_trace(lh_rsc,
1453                          "Ignoring constraint %d: first (%s for %s) not found",
1454                          order->id, order->lh_action_task, lh_rsc->id);
1455 
1456         } else if ((lh_rsc->fns->state(lh_rsc, TRUE) == RSC_ROLE_UNPROMOTED)
1457                    && pcmk__str_eq(op_type, RSC_DEMOTE, pcmk__str_casei)) {
1458             free(key);
1459             pe_rsc_trace(lh_rsc,
1460                          "Ignoring constraint %d: first (%s for %s) not found",
1461                          order->id, order->lh_action_task, lh_rsc->id);
1462 
1463         } else {
1464             pe_rsc_trace(lh_rsc,
1465                          "Creating first (%s for %s) for constraint %d ",
1466                          order->lh_action_task, lh_rsc->id, order->id);
1467             lh_action = custom_action(lh_rsc, key, op_type, NULL, TRUE, TRUE, data_set);
1468             lh_actions = g_list_prepend(NULL, lh_action);
1469         }
1470 
1471         free(op_type);
1472     }
1473 
1474     if (rh_rsc == NULL) {
1475         if (order->rh_action == NULL) {
1476             pe_rsc_trace(lh_rsc, "Ignoring constraint %d: then not found",
1477                          order->id);
1478             return;
1479         }
1480         rh_rsc = order->rh_action->rsc;
1481     }
1482     for (GList *gIter = lh_actions; gIter != NULL; gIter = gIter->next) {
1483         lh_action = (pe_action_t *) gIter->data;
1484 
1485         if (rh_rsc == NULL) {
1486             order_actions(lh_action, order->rh_action, order->type);
1487 
1488         } else {
1489             rsc_order_then(lh_action, rh_rsc, order);
1490         }
1491     }
1492 
1493     g_list_free(lh_actions);
1494 }
1495 
1496 void
1497 pcmk__apply_orderings(pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1498 {
1499     crm_trace("Applying ordering constraints");
1500 
1501     /* Don't ask me why, but apparently they need to be processed in
1502      * the order they were created in... go figure
1503      *
1504      * Also g_list_append() has horrendous performance characteristics
1505      * So we need to use g_list_prepend() and then reverse the list here
1506      */
1507     data_set->ordering_constraints = g_list_reverse(data_set->ordering_constraints);
1508 
1509     for (GList *gIter = data_set->ordering_constraints;
1510          gIter != NULL; gIter = gIter->next) {
1511 
1512         pe__ordering_t *order = gIter->data;
1513         pe_resource_t *rsc = order->lh_rsc;
1514 
1515         if (rsc != NULL) {
1516             rsc_order_first(rsc, order, data_set);
1517             continue;
1518         }
1519 
1520         rsc = order->rh_rsc;
1521         if (rsc != NULL) {
1522             rsc_order_then(order->lh_action, rsc, order);
1523 
1524         } else {
1525             crm_trace("Applying ordering constraint %d (non-resource actions)",
1526                       order->id);
1527             order_actions(order->lh_action, order->rh_action, order->type);
1528         }
1529     }
1530 
1531     g_list_foreach(data_set->actions, (GFunc) pcmk__block_colocated_starts,
1532                    data_set);
1533 
1534     crm_trace("Ordering probes");
1535     pcmk__order_probes(data_set);
1536 
1537     crm_trace("Updating %d actions", g_list_length(data_set->actions));
1538     g_list_foreach(data_set->actions,
1539                    (GFunc) pcmk__update_action_for_orderings, data_set);
1540 
1541     pcmk__disable_invalid_orderings(data_set);
1542 }
1543 
1544 /*!
1545  * \internal
1546  * \brief Order a given action after each action in a given list
1547  *
1548  * \param[in] after   "After" action
1549  * \param[in] list    List of "before" actions
1550  */
1551 void
1552 pcmk__order_after_each(pe_action_t *after, GList *list)
     /* [previous][next][first][last][top][bottom][index][help] */
1553 {
1554     const char *after_desc = (after->task == NULL)? after->uuid : after->task;
1555 
1556     for (GList *iter = list; iter != NULL; iter = iter->next) {
1557         pe_action_t *before = (pe_action_t *) iter->data;
1558         const char *before_desc = before->task? before->task : before->uuid;
1559 
1560         crm_debug("Ordering %s on %s before %s on %s",
1561                   before_desc, crm_str(before->node->details->uname),
1562                   after_desc, crm_str(after->node->details->uname));
1563         order_actions(before, after, pe_order_optional);
1564     }
1565 }

/* [previous][next][first][last][top][bottom][index][help] */