root/lib/pacemaker/pcmk_sched_ordering.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. invert_action
  2. get_ordering_type
  3. get_ordering_symmetry
  4. ordering_flags_for_kind
  5. get_ordering_resource
  6. get_minimum_first_instances
  7. clone_min_ordering
  8. inverse_ordering
  9. unpack_simple_rsc_order
  10. task_from_action_or_key
  11. handle_migration_ordering
  12. pcmk__new_ordering
  13. unpack_order_set
  14. order_rsc_sets
  15. unpack_order_tags
  16. pcmk__unpack_ordering
  17. ordering_is_invalid
  18. pcmk__disable_invalid_orderings
  19. pcmk__order_stops_before_shutdown
  20. find_actions_by_task
  21. rsc_order_then
  22. rsc_order_first
  23. pcmk__apply_orderings

   1 /*
   2  * Copyright 2004-2021 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU General Public License version 2
   7  * or later (GPLv2+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <stdbool.h>
  13 #include <glib.h>
  14 
  15 #include <crm/crm.h>
  16 #include <pacemaker-internal.h>
  17 #include "libpacemaker_private.h"
  18 
  19 enum pe_order_kind {
  20     pe_order_kind_optional,
  21     pe_order_kind_mandatory,
  22     pe_order_kind_serialize,
  23 };
  24 
  25 enum ordering_symmetry {
  26     ordering_asymmetric,        // the only relation in an asymmetric ordering
  27     ordering_symmetric,         // the normal relation in a symmetric ordering
  28     ordering_symmetric_inverse, // the inverse relation in a symmetric ordering
  29 };
  30 
  31 #define EXPAND_CONSTRAINT_IDREF(__set, __rsc, __name) do {                      \
  32         __rsc = pcmk__find_constraint_resource(data_set->resources, __name);    \
  33         if (__rsc == NULL) {                                                    \
  34             pcmk__config_err("%s: No resource found for %s", __set, __name);    \
  35             return pcmk_rc_schema_validation;                                   \
  36         }                                                                       \
  37     } while (0)
  38 
  39 static const char *
  40 invert_action(const char *action)
     /* [previous][next][first][last][top][bottom][index][help] */
  41 {
  42     if (pcmk__str_eq(action, RSC_START, pcmk__str_casei)) {
  43         return RSC_STOP;
  44 
  45     } else if (pcmk__str_eq(action, RSC_STOP, pcmk__str_casei)) {
  46         return RSC_START;
  47 
  48     } else if (pcmk__str_eq(action, RSC_PROMOTE, pcmk__str_casei)) {
  49         return RSC_DEMOTE;
  50 
  51     } else if (pcmk__str_eq(action, RSC_DEMOTE, pcmk__str_casei)) {
  52         return RSC_PROMOTE;
  53 
  54     } else if (pcmk__str_eq(action, RSC_PROMOTED, pcmk__str_casei)) {
  55         return RSC_DEMOTED;
  56 
  57     } else if (pcmk__str_eq(action, RSC_DEMOTED, pcmk__str_casei)) {
  58         return RSC_PROMOTED;
  59 
  60     } else if (pcmk__str_eq(action, RSC_STARTED, pcmk__str_casei)) {
  61         return RSC_STOPPED;
  62 
  63     } else if (pcmk__str_eq(action, RSC_STOPPED, pcmk__str_casei)) {
  64         return RSC_STARTED;
  65     }
  66     crm_warn("Unknown action '%s' specified in order constraint", action);
  67     return NULL;
  68 }
  69 
  70 static enum pe_order_kind
  71 get_ordering_type(xmlNode *xml_obj)
     /* [previous][next][first][last][top][bottom][index][help] */
  72 {
  73     enum pe_order_kind kind_e = pe_order_kind_mandatory;
  74     const char *kind = crm_element_value(xml_obj, XML_ORDER_ATTR_KIND);
  75 
  76     if (kind == NULL) {
  77         const char *score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
  78 
  79         kind_e = pe_order_kind_mandatory;
  80 
  81         if (score) {
  82             // @COMPAT deprecated informally since 1.0.7, formally since 2.0.1
  83             int score_i = char2score(score);
  84 
  85             if (score_i == 0) {
  86                 kind_e = pe_order_kind_optional;
  87             }
  88             pe_warn_once(pe_wo_order_score,
  89                          "Support for 'score' in rsc_order is deprecated "
  90                          "and will be removed in a future release "
  91                          "(use 'kind' instead)");
  92         }
  93 
  94     } else if (pcmk__str_eq(kind, "Mandatory", pcmk__str_casei)) {
  95         kind_e = pe_order_kind_mandatory;
  96 
  97     } else if (pcmk__str_eq(kind, "Optional", pcmk__str_casei)) {
  98         kind_e = pe_order_kind_optional;
  99 
 100     } else if (pcmk__str_eq(kind, "Serialize", pcmk__str_casei)) {
 101         kind_e = pe_order_kind_serialize;
 102 
 103     } else {
 104         pcmk__config_err("Resetting '" XML_ORDER_ATTR_KIND "' for constraint "
 105                          "'%s' to Mandatory because '%s' is not valid",
 106                          crm_str(ID(xml_obj)), kind);
 107     }
 108     return kind_e;
 109 }
 110 
 111 /*!
 112  * \internal
 113  * \brief Get ordering symmetry from XML
 114  *
 115  * \param[in] xml_obj               Ordering XML
 116  * \param[in] parent_kind           Default ordering kind
 117  * \param[in] parent_symmetrical_s  Parent element's symmetrical setting, if any
 118  *
 119  * \retval ordering_symmetric   Ordering is symmetric
 120  * \retval ordering_asymmetric  Ordering is asymmetric
 121  */
 122 static enum ordering_symmetry
 123 get_ordering_symmetry(xmlNode *xml_obj, enum pe_order_kind parent_kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 124                       const char *parent_symmetrical_s)
 125 {
 126     const char *symmetrical_s = NULL;
 127     enum pe_order_kind kind = parent_kind; // Default to parent's kind
 128 
 129     // Check ordering XML for explicit kind
 130     if ((crm_element_value(xml_obj, XML_ORDER_ATTR_KIND) != NULL)
 131         || (crm_element_value(xml_obj, XML_RULE_ATTR_SCORE) != NULL)) {
 132         kind = get_ordering_type(xml_obj);
 133     }
 134 
 135     // Check ordering XML (and parent) for explicit symmetrical setting
 136     symmetrical_s = crm_element_value(xml_obj, XML_CONS_ATTR_SYMMETRICAL);
 137     if (symmetrical_s == NULL) {
 138         symmetrical_s = parent_symmetrical_s;
 139     }
 140     if (symmetrical_s != NULL) {
 141         if (crm_is_true(symmetrical_s)) {
 142             if (kind == pe_order_kind_serialize) {
 143                 pcmk__config_warn("Ignoring " XML_CONS_ATTR_SYMMETRICAL
 144                                   " for '%s' because not valid with "
 145                                   XML_ORDER_ATTR_KIND " of 'Serialize'",
 146                                   ID(xml_obj));
 147             } else {
 148                 return ordering_symmetric;
 149             }
 150         }
 151         return ordering_asymmetric;
 152     }
 153 
 154     // Use default symmetry
 155     if (kind == pe_order_kind_serialize) {
 156         return ordering_asymmetric;
 157     }
 158     return ordering_symmetric;
 159 }
 160 
 161 /*!
 162  * \internal
 163  * \brief Get ordering flags appropriate to ordering kind
 164  *
 165  * \param[in] kind      Ordering kind
 166  * \param[in] first     Action name for 'first' action
 167  * \param[in] symmetry  This ordering's symmetry role
 168  *
 169  * \return Minimal ordering flags appropriate to \p kind
 170  */
 171 static enum pe_ordering
 172 ordering_flags_for_kind(enum pe_order_kind kind, const char *first,
     /* [previous][next][first][last][top][bottom][index][help] */
 173                         enum ordering_symmetry symmetry)
 174 {
 175     enum pe_ordering flags = pe_order_none; // so we trace-log all flags set
 176 
 177     pe__set_order_flags(flags, pe_order_optional);
 178 
 179     switch (kind) {
 180         case pe_order_kind_optional:
 181             break;
 182 
 183         case pe_order_kind_serialize:
 184             pe__set_order_flags(flags, pe_order_serialize_only);
 185             break;
 186 
 187         case pe_order_kind_mandatory:
 188             switch (symmetry) {
 189                 case ordering_asymmetric:
 190                     pe__set_order_flags(flags, pe_order_asymmetrical);
 191                     break;
 192 
 193                 case ordering_symmetric:
 194                     pe__set_order_flags(flags, pe_order_implies_then);
 195                     if (pcmk__strcase_any_of(first, RSC_START, RSC_PROMOTE,
 196                                              NULL)) {
 197                         pe__set_order_flags(flags, pe_order_runnable_left);
 198                     }
 199                     break;
 200 
 201                 case ordering_symmetric_inverse:
 202                     pe__set_order_flags(flags, pe_order_implies_first);
 203                     break;
 204             }
 205             break;
 206     }
 207     return flags;
 208 }
 209 
 210 /*!
 211  * \internal
 212  * \brief Find resource corresponding to ID specified in ordering
 213  *
 214  * \param[in] xml            Ordering XML
 215  * \param[in] resource_attr  XML attribute name for resource ID
 216  * \param[in] instance_attr  XML attribute name for instance number
 217  * \param[in] data_set       Cluster working set
 218  *
 219  * \return Resource corresponding to \p id, or NULL if none
 220  */
 221 static pe_resource_t *
 222 get_ordering_resource(xmlNode *xml, const char *resource_attr,
     /* [previous][next][first][last][top][bottom][index][help] */
 223                       const char *instance_attr, pe_working_set_t *data_set)
 224 {
 225     pe_resource_t *rsc = NULL;
 226     const char *rsc_id = crm_element_value(xml, resource_attr);
 227     const char *instance_id = crm_element_value(xml, instance_attr);
 228 
 229     if (rsc_id == NULL) {
 230         pcmk__config_err("Ignoring constraint '%s' without %s",
 231                          ID(xml), resource_attr);
 232         return NULL;
 233     }
 234 
 235     rsc = pcmk__find_constraint_resource(data_set->resources, rsc_id);
 236     if (rsc == NULL) {
 237         pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 238                          "does not exist", ID(xml), rsc_id);
 239         return NULL;
 240     }
 241 
 242     if (instance_id != NULL) {
 243         if (!pe_rsc_is_clone(rsc)) {
 244             pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 245                              "is not a clone but instance '%s' was requested",
 246                              ID(xml), rsc_id, instance_id);
 247             return NULL;
 248         }
 249         rsc = find_clone_instance(rsc, instance_id, data_set);
 250         if (rsc == NULL) {
 251             pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 252                              "does not have an instance '%s'",
 253                              "'%s'", ID(xml), rsc_id, instance_id);
 254             return NULL;
 255         }
 256     }
 257     return rsc;
 258 }
 259 
 260 /*!
 261  * \internal
 262  * \brief Determine minimum number of 'first' instances required in ordering
 263  *
 264  * \param[in] rsc  'First' resource in ordering
 265  * \param[in] xml  Ordering XML
 266  *
 267  * \return Minimum 'first' instances required (or 0 if not applicable)
 268  */
 269 static int
 270 get_minimum_first_instances(pe_resource_t *rsc, xmlNode *xml)
     /* [previous][next][first][last][top][bottom][index][help] */
 271 {
 272     if (pe_rsc_is_clone(rsc)) {
 273         const char *clone_min = NULL;
 274 
 275         clone_min = g_hash_table_lookup(rsc->meta,
 276                                         XML_RSC_ATTR_INCARNATION_MIN);
 277         if (clone_min != NULL) {
 278             int clone_min_int = 0;
 279 
 280             pcmk__scan_min_int(clone_min, &clone_min_int, 0);
 281             return clone_min_int;
 282         }
 283 
 284         /* @COMPAT 1.1.13:
 285          * require-all=false is deprecated equivalent of clone-min=1
 286          */
 287         clone_min = crm_element_value(xml, "require-all");
 288         if (clone_min != NULL) {
 289             pe_warn_once(pe_wo_require_all,
 290                          "Support for require-all in ordering constraints "
 291                          "is deprecated and will be removed in a future release"
 292                          " (use clone-min clone meta-attribute instead)");
 293             if (!crm_is_true(clone_min)) {
 294                 return 1;
 295             }
 296         }
 297     }
 298     return 0;
 299 }
 300 
 301 /*!
 302  * \internal
 303  * \brief Create orderings for a constraint with clone-min > 0
 304  *
 305  * \param[in] id            Ordering ID
 306  * \param[in] rsc_first     'First' resource in ordering (a clone)
 307  * \param[in] action_first  'First' action in ordering
 308  * \param[in] rsc_then      'Then' resource in ordering
 309  * \param[in] action_then   'Then' action in ordering
 310  * \param[in] flags         Ordering flags
 311  * \param[in] clone_min     Minimum required instances of 'first'
 312  * \param[in] data_set      Cluster working set
 313  */
 314 static void
 315 clone_min_ordering(const char *id,
     /* [previous][next][first][last][top][bottom][index][help] */
 316                    pe_resource_t *rsc_first, const char *action_first,
 317                    pe_resource_t *rsc_then, const char *action_then,
 318                    enum pe_ordering flags, int clone_min,
 319                    pe_working_set_t *data_set)
 320 {
 321     // Create a pseudo-action for when the minimum instances are active
 322     char *task = crm_strdup_printf(CRM_OP_RELAXED_CLONE ":%s", id);
 323     pe_action_t *clone_min_met = get_pseudo_op(task, data_set);
 324 
 325     free(task);
 326 
 327     /* Require the pseudo-action to have the required number of actions to be
 328      * considered runnable before allowing the pseudo-action to be runnable.
 329      */
 330     clone_min_met->required_runnable_before = clone_min;
 331     pe__set_action_flags(clone_min_met, pe_action_requires_any);
 332 
 333     // Order the actions for each clone instance before the pseudo-action
 334     for (GList *rIter = rsc_first->children; rIter != NULL;
 335          rIter = rIter->next) {
 336 
 337         pe_resource_t *child = rIter->data;
 338 
 339         pcmk__new_ordering(child, pcmk__op_key(child->id, action_first, 0),
 340                            NULL, NULL, NULL, clone_min_met,
 341                            pe_order_one_or_more|pe_order_implies_then_printed,
 342                            data_set);
 343     }
 344 
 345     // Order "then" action after the pseudo-action (if runnable)
 346     pcmk__new_ordering(NULL, NULL, clone_min_met, rsc_then,
 347                        pcmk__op_key(rsc_then->id, action_then, 0),
 348                        NULL, flags|pe_order_runnable_left, data_set);
 349 }
 350 
 351 /*!
 352  * \internal
 353  * \brief Update ordering flags for restart-type=restart
 354  *
 355  * \param[in]  rsc    'Then' resource in ordering
 356  * \param[in]  kind   Ordering kind
 357  * \param[in]  flag   Ordering flag to set (when applicable)
 358  * \param[out] flags  Ordering flag set to update
 359  *
 360  * \compat The restart-type resource meta-attribute is deprecated. Eventually,
 361  *         it will be removed, and pe_restart_ignore will be the only behavior,
 362  *         at which time this can just be removed entirely.
 363  */
 364 #define handle_restart_type(rsc, kind, flag, flags) do {        \
 365         if (((kind) == pe_order_kind_optional)                  \
 366             && ((rsc)->restart_type == pe_restart_restart)) {   \
 367             pe__set_order_flags((flags), (flag));               \
 368         }                                                       \
 369     } while (0)
 370 
 371 /*!
 372  * \internal
 373  * \brief Create new ordering for inverse of symmetric constraint
 374  *
 375  * \param[in] id            Ordering ID (for logging only)
 376  * \param[in] kind          Ordering kind
 377  * \param[in] rsc_first     'First' resource in ordering (a clone)
 378  * \param[in] action_first  'First' action in ordering
 379  * \param[in] rsc_then      'Then' resource in ordering
 380  * \param[in] action_then   'Then' action in ordering
 381  * \param[in] data_set      Cluster working set
 382  */
 383 static void
 384 inverse_ordering(const char *id, enum pe_order_kind kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 385                  pe_resource_t *rsc_first, const char *action_first,
 386                  pe_resource_t *rsc_then, const char *action_then,
 387                  pe_working_set_t *data_set)
 388 {
 389     action_then = invert_action(action_then);
 390     action_first = invert_action(action_first);
 391     if ((action_then == NULL) || (action_first == NULL)) {
 392         pcmk__config_warn("Cannot invert constraint '%s' "
 393                           "(please specify inverse manually)", id);
 394     } else {
 395         enum pe_ordering flags = ordering_flags_for_kind(kind, action_first,
 396                                                          ordering_symmetric_inverse);
 397 
 398         handle_restart_type(rsc_then, kind, pe_order_implies_first, flags);
 399         pcmk__order_resource_actions(rsc_then, action_then, rsc_first,
 400                                      action_first, flags, data_set);
 401     }
 402 }
 403 
 404 static void
 405 unpack_simple_rsc_order(xmlNode *xml_obj, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 406 {
 407     pe_resource_t *rsc_then = NULL;
 408     pe_resource_t *rsc_first = NULL;
 409     int min_required_before = 0;
 410     enum pe_order_kind kind = pe_order_kind_mandatory;
 411     enum pe_ordering cons_weight = pe_order_none;
 412     enum ordering_symmetry symmetry;
 413 
 414     const char *action_then = NULL;
 415     const char *action_first = NULL;
 416     const char *id = NULL;
 417 
 418     CRM_CHECK(xml_obj != NULL, return);
 419 
 420     id = crm_element_value(xml_obj, XML_ATTR_ID);
 421     if (id == NULL) {
 422         pcmk__config_err("Ignoring <%s> constraint without " XML_ATTR_ID,
 423                          crm_element_name(xml_obj));
 424         return;
 425     }
 426 
 427     rsc_first = get_ordering_resource(xml_obj, XML_ORDER_ATTR_FIRST,
 428                                       XML_ORDER_ATTR_FIRST_INSTANCE,
 429                                       data_set);
 430     if (rsc_first == NULL) {
 431         return;
 432     }
 433 
 434     rsc_then = get_ordering_resource(xml_obj, XML_ORDER_ATTR_THEN,
 435                                      XML_ORDER_ATTR_THEN_INSTANCE,
 436                                      data_set);
 437     if (rsc_then == NULL) {
 438         return;
 439     }
 440 
 441     action_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST_ACTION);
 442     if (action_first == NULL) {
 443         action_first = RSC_START;
 444     }
 445 
 446     action_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN_ACTION);
 447     if (action_then == NULL) {
 448         action_then = action_first;
 449     }
 450 
 451     kind = get_ordering_type(xml_obj);
 452 
 453     symmetry = get_ordering_symmetry(xml_obj, kind, NULL);
 454     cons_weight = ordering_flags_for_kind(kind, action_first, symmetry);
 455 
 456     handle_restart_type(rsc_then, kind, pe_order_implies_then, cons_weight);
 457 
 458     /* If there is a minimum number of instances that must be runnable before
 459      * the 'then' action is runnable, we use a pseudo-action for convenience:
 460      * minimum number of clone instances have runnable actions ->
 461      * pseudo-action is runnable -> dependency is runnable.
 462      */
 463     min_required_before = get_minimum_first_instances(rsc_first, xml_obj);
 464     if (min_required_before > 0) {
 465         clone_min_ordering(id, rsc_first, action_first, rsc_then, action_then,
 466                            cons_weight, min_required_before, data_set);
 467     } else {
 468         pcmk__order_resource_actions(rsc_first, action_first, rsc_then,
 469                                      action_then, cons_weight, data_set);
 470     }
 471 
 472     if (symmetry == ordering_symmetric) {
 473         inverse_ordering(id, kind, rsc_first, action_first,
 474                          rsc_then, action_then, data_set);
 475     }
 476 }
 477 
 478 static char *
 479 task_from_action_or_key(pe_action_t *action, const char *key)
     /* [previous][next][first][last][top][bottom][index][help] */
 480 {
 481     char *res = NULL;
 482 
 483     if (action != NULL) {
 484         res = strdup(action->task);
 485     } else if (key != NULL) {
 486         parse_op_key(key, NULL, &res, NULL);
 487     }
 488     return res;
 489 }
 490 
 491 /*!
 492  * \internal
 493  * \brief Apply start/stop orderings to migrations
 494  *
 495  * Orderings involving start, stop, demote, and promote actions must be honored
 496  * during a migration as well, so duplicate any such ordering for the
 497  * corresponding migration actions.
 498  *
 499  * \param[in] order     Ordering constraint to check
 500  * \param[in] data_set  Cluster working set
 501  */
 502 static void
 503 handle_migration_ordering(pe__ordering_t *order, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 504 {
 505     char *lh_task = NULL;
 506     char *rh_task = NULL;
 507     bool rh_migratable;
 508     bool lh_migratable;
 509 
 510     // Only orderings between two different resources are relevant
 511     if ((order->lh_rsc == NULL) || (order->rh_rsc == NULL)
 512         || (order->lh_rsc == order->rh_rsc)) {
 513         return;
 514     }
 515 
 516     // Constraints between a parent resource and its children are not relevant
 517     if (is_parent(order->lh_rsc, order->rh_rsc)
 518         || is_parent(order->rh_rsc, order->lh_rsc)) {
 519         return;
 520     }
 521 
 522     // Only orderings involving at least one migratable resource are relevant
 523     lh_migratable = pcmk_is_set(order->lh_rsc->flags, pe_rsc_allow_migrate);
 524     rh_migratable = pcmk_is_set(order->rh_rsc->flags, pe_rsc_allow_migrate);
 525     if (!lh_migratable && !rh_migratable) {
 526         return;
 527     }
 528 
 529     // Check which actions are involved
 530     lh_task = task_from_action_or_key(order->lh_action, order->lh_action_task);
 531     rh_task = task_from_action_or_key(order->rh_action, order->rh_action_task);
 532     if ((lh_task == NULL) || (rh_task == NULL)) {
 533         goto cleanup_order;
 534     }
 535 
 536     if (pcmk__str_eq(lh_task, RSC_START, pcmk__str_casei)
 537         && pcmk__str_eq(rh_task, RSC_START, pcmk__str_casei)) {
 538 
 539         int flags = pe_order_optional;
 540 
 541         if (lh_migratable && rh_migratable) {
 542             /* A start then B start
 543              * -> A migrate_from then B migrate_to */
 544             pcmk__new_ordering(order->lh_rsc,
 545                                pcmk__op_key(order->lh_rsc->id, RSC_MIGRATED, 0),
 546                                NULL, order->rh_rsc,
 547                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 548                                NULL, flags, data_set);
 549         }
 550 
 551         if (rh_migratable) {
 552             if (lh_migratable) {
 553                 pe__set_order_flags(flags, pe_order_apply_first_non_migratable);
 554             }
 555 
 556             /* A start then B start
 557              * -> A start then B migrate_to (if start is not part of a
 558              *    migration)
 559              */
 560             pcmk__new_ordering(order->lh_rsc,
 561                                pcmk__op_key(order->lh_rsc->id, RSC_START, 0),
 562                                NULL, order->rh_rsc,
 563                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 564                                NULL, flags, data_set);
 565         }
 566 
 567     } else if (rh_migratable && pcmk__str_eq(lh_task, RSC_STOP, pcmk__str_casei)
 568                && pcmk__str_eq(rh_task, RSC_STOP, pcmk__str_casei)) {
 569 
 570         int flags = pe_order_optional;
 571 
 572         if (lh_migratable) {
 573             pe__set_order_flags(flags, pe_order_apply_first_non_migratable);
 574         }
 575 
 576         /* For an ordering "stop A then stop B", if A is moving via restart, and
 577          * B is migrating, enforce that B's migrate_to occurs after A's stop.
 578          */
 579         pcmk__new_ordering(order->lh_rsc,
 580                            pcmk__op_key(order->lh_rsc->id, RSC_STOP, 0), NULL,
 581                            order->rh_rsc,
 582                            pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 583                            NULL, flags, data_set);
 584 
 585         // Also order B's migrate_from after A's stop during partial migrations
 586         if (order->rh_rsc->partial_migration_target) {
 587             pcmk__new_ordering(order->lh_rsc,
 588                                pcmk__op_key(order->lh_rsc->id, RSC_STOP, 0),
 589                                NULL, order->rh_rsc,
 590                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATED, 0),
 591                                NULL, flags, data_set);
 592         }
 593 
 594     } else if (pcmk__str_eq(lh_task, RSC_PROMOTE, pcmk__str_casei)
 595                && pcmk__str_eq(rh_task, RSC_START, pcmk__str_casei)) {
 596 
 597         int flags = pe_order_optional;
 598 
 599         if (rh_migratable) {
 600             /* A promote then B start
 601              * -> A promote then B migrate_to */
 602             pcmk__new_ordering(order->lh_rsc,
 603                                pcmk__op_key(order->lh_rsc->id, RSC_PROMOTE, 0),
 604                                NULL, order->rh_rsc,
 605                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 606                                NULL, flags, data_set);
 607         }
 608 
 609     } else if (pcmk__str_eq(lh_task, RSC_DEMOTE, pcmk__str_casei)
 610                && pcmk__str_eq(rh_task, RSC_STOP, pcmk__str_casei)) {
 611 
 612         int flags = pe_order_optional;
 613 
 614         if (rh_migratable) {
 615             /* A demote then B stop
 616              * -> A demote then B migrate_to */
 617             pcmk__new_ordering(order->lh_rsc,
 618                                pcmk__op_key(order->lh_rsc->id, RSC_DEMOTE, 0),
 619                                NULL, order->rh_rsc,
 620                                pcmk__op_key(order->rh_rsc->id, RSC_MIGRATE, 0),
 621                                NULL, flags, data_set);
 622 
 623             // Also order B migrate_from after A demote during partial migrations
 624             if (order->rh_rsc->partial_migration_target) {
 625                 pcmk__new_ordering(order->lh_rsc,
 626                                    pcmk__op_key(order->lh_rsc->id, RSC_DEMOTE, 0),
 627                                    NULL, order->rh_rsc,
 628                                    pcmk__op_key(order->rh_rsc->id, RSC_MIGRATED, 0),
 629                                    NULL, flags, data_set);
 630             }
 631         }
 632     }
 633 
 634 cleanup_order:
 635     free(lh_task);
 636     free(rh_task);
 637 }
 638 
 639 /*!
 640  * \internal
 641  * \brief Create a new ordering between two actions
 642  *
 643  * \param[in] lh_rsc           Resource for 'first' action (if NULL and
 644  *                             \p lh_action is a resource action, that
 645  *                             resource will be used)
 646  * \param[in] lh_action_task   Action key for 'first' action (if NULL and
 647  *                             \p lh_action is not NULL, its UUID will be used)
 648  * \param[in] lh_action        'first' action (if NULL, \p lh_rsc and
 649  *                             \p lh_action_task must be set)
 650  *
 651  * \param[in] rh_rsc           Resource for 'then' action (if NULL and
 652  *                             \p rh_action is a resource action, that
 653  *                             resource will be used)
 654  * \param[in] rh_action_task   Action key for 'then' action (if NULL and
 655  *                             \p rh_action is not NULL, its UUID will be used)
 656  * \param[in] rh_action        'then' action (if NULL, \p rh_rsc and
 657  *                             \p rh_action_task must be set)
 658  *
 659  * \param[in] type             Flag set of enum pe_ordering
 660  * \param[in] data_set         Cluster working set to add ordering to
 661  *
 662  * \note This function takes ownership of lh_action_task and rh_action_task,
 663  *       which do not need to be freed by the caller.
 664  */
 665 void
 666 pcmk__new_ordering(pe_resource_t *lh_rsc, char *lh_action_task,
     /* [previous][next][first][last][top][bottom][index][help] */
 667                    pe_action_t *lh_action, pe_resource_t *rh_rsc,
 668                    char *rh_action_task, pe_action_t *rh_action,
 669                    enum pe_ordering type, pe_working_set_t *data_set)
 670 {
 671     pe__ordering_t *order = NULL;
 672 
 673     // One of action or resource must be specified for each side
 674     CRM_CHECK(((lh_action != NULL) || (lh_rsc != NULL))
 675               && ((rh_action != NULL) || (rh_rsc != NULL)),
 676               free(lh_action_task); free(rh_action_task); return);
 677 
 678     if ((lh_rsc == NULL) && (lh_action != NULL)) {
 679         lh_rsc = lh_action->rsc;
 680     }
 681     if ((rh_rsc == NULL) && (rh_action != NULL)) {
 682         rh_rsc = rh_action->rsc;
 683     }
 684 
 685     order = calloc(1, sizeof(pe__ordering_t));
 686 
 687     order->id = data_set->order_id++;
 688     order->type = type;
 689     order->lh_rsc = lh_rsc;
 690     order->rh_rsc = rh_rsc;
 691     order->lh_action = lh_action;
 692     order->rh_action = rh_action;
 693     order->lh_action_task = lh_action_task;
 694     order->rh_action_task = rh_action_task;
 695 
 696     if ((order->lh_action_task == NULL) && (lh_action != NULL)) {
 697         order->lh_action_task = strdup(lh_action->uuid);
 698     }
 699 
 700     if ((order->rh_action_task == NULL) && (rh_action != NULL)) {
 701         order->rh_action_task = strdup(rh_action->uuid);
 702     }
 703 
 704     if ((order->lh_rsc == NULL) && (lh_action != NULL)) {
 705         order->lh_rsc = lh_action->rsc;
 706     }
 707 
 708     if ((order->rh_rsc == NULL) && (rh_action != NULL)) {
 709         order->rh_rsc = rh_action->rsc;
 710     }
 711 
 712     pe_rsc_trace(lh_rsc, "Created ordering %d for %s then %s",
 713                  (data_set->order_id - 1),
 714                  ((lh_action_task == NULL)? "?" : lh_action_task),
 715                  ((rh_action_task == NULL)? "?" : rh_action_task));
 716 
 717     data_set->ordering_constraints = g_list_prepend(data_set->ordering_constraints,
 718                                                     order);
 719     handle_migration_ordering(order, data_set);
 720 }
 721 
 722 /*!
 723  * \brief Unpack a set in an ordering constraint
 724  *
 725  * \param[in]  set                    Set XML to unpack
 726  * \param[in]  parent_kind            rsc_order XML "kind" attribute
 727  * \param[in]  parent_symmetrical_s   rsc_order XML "symmetrical" attribute
 728  * \param[in]  data_set               Cluster working set
 729  *
 730  * \return Standard Pacemaker return code
 731  */
 732 static int
 733 unpack_order_set(xmlNode *set, enum pe_order_kind parent_kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 734                  const char *parent_symmetrical_s, pe_working_set_t *data_set)
 735 {
 736     xmlNode *xml_rsc = NULL;
 737     GList *set_iter = NULL;
 738     GList *resources = NULL;
 739 
 740     pe_resource_t *last = NULL;
 741     pe_resource_t *resource = NULL;
 742 
 743     int local_kind = parent_kind;
 744     bool sequential = false;
 745     enum pe_ordering flags = pe_order_optional;
 746     enum ordering_symmetry symmetry;
 747 
 748     char *key = NULL;
 749     const char *id = ID(set);
 750     const char *action = crm_element_value(set, "action");
 751     const char *sequential_s = crm_element_value(set, "sequential");
 752     const char *kind_s = crm_element_value(set, XML_ORDER_ATTR_KIND);
 753 
 754     if (action == NULL) {
 755         action = RSC_START;
 756     }
 757 
 758     if (kind_s) {
 759         local_kind = get_ordering_type(set);
 760     }
 761     if (sequential_s == NULL) {
 762         sequential_s = "1";
 763     }
 764 
 765     sequential = crm_is_true(sequential_s);
 766 
 767     symmetry = get_ordering_symmetry(set, parent_kind, parent_symmetrical_s);
 768     flags = ordering_flags_for_kind(local_kind, action, symmetry);
 769 
 770     for (xml_rsc = first_named_child(set, XML_TAG_RESOURCE_REF);
 771          xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 772 
 773         EXPAND_CONSTRAINT_IDREF(id, resource, ID(xml_rsc));
 774         resources = g_list_append(resources, resource);
 775     }
 776 
 777     if (pcmk__list_of_1(resources)) {
 778         crm_trace("Single set: %s", id);
 779         goto done;
 780     }
 781 
 782     set_iter = resources;
 783     while (set_iter != NULL) {
 784         resource = (pe_resource_t *) set_iter->data;
 785         set_iter = set_iter->next;
 786 
 787         key = pcmk__op_key(resource->id, action, 0);
 788 
 789         if (local_kind == pe_order_kind_serialize) {
 790             /* Serialize before everything that comes after */
 791 
 792             for (GList *gIter = set_iter; gIter != NULL; gIter = gIter->next) {
 793                 pe_resource_t *then_rsc = (pe_resource_t *) gIter->data;
 794                 char *then_key = pcmk__op_key(then_rsc->id, action, 0);
 795 
 796                 pcmk__new_ordering(resource, strdup(key), NULL, then_rsc,
 797                                    then_key, NULL, flags, data_set);
 798             }
 799 
 800         } else if (sequential) {
 801             if (last != NULL) {
 802                 pcmk__order_resource_actions(last, action, resource, action,
 803                                              flags, data_set);
 804             }
 805             last = resource;
 806         }
 807         free(key);
 808     }
 809 
 810     if (symmetry == ordering_asymmetric) {
 811         goto done;
 812     }
 813 
 814     last = NULL;
 815     action = invert_action(action);
 816 
 817     flags = ordering_flags_for_kind(local_kind, action,
 818                                     ordering_symmetric_inverse);
 819 
 820     set_iter = resources;
 821     while (set_iter != NULL) {
 822         resource = (pe_resource_t *) set_iter->data;
 823         set_iter = set_iter->next;
 824 
 825         if (sequential) {
 826             if (last != NULL) {
 827                 pcmk__order_resource_actions(resource, action, last, action,
 828                                              flags, data_set);
 829             }
 830             last = resource;
 831         }
 832     }
 833 
 834   done:
 835     g_list_free(resources);
 836     return pcmk_rc_ok;
 837 }
 838 
 839 /*!
 840  * \brief Order two resource sets relative to each other
 841  *
 842  * \param[in] id        Ordering ID (for logging)
 843  * \param[in] set1      First listed set
 844  * \param[in] set2      Second listed set
 845  * \param[in] kind      Ordering kind
 846  * \param[in] data_set  Cluster working set
 847  * \param[in] symmetry  Which ordering symmetry applies to this relation
 848  *
 849  * \return Standard Pacemaker return code
 850  */
 851 static int
 852 order_rsc_sets(const char *id, xmlNode *set1, xmlNode *set2,
     /* [previous][next][first][last][top][bottom][index][help] */
 853                enum pe_order_kind kind, pe_working_set_t *data_set,
 854                enum ordering_symmetry symmetry)
 855 {
 856 
 857     xmlNode *xml_rsc = NULL;
 858     xmlNode *xml_rsc_2 = NULL;
 859 
 860     pe_resource_t *rsc_1 = NULL;
 861     pe_resource_t *rsc_2 = NULL;
 862 
 863     const char *action_1 = crm_element_value(set1, "action");
 864     const char *action_2 = crm_element_value(set2, "action");
 865 
 866     const char *sequential_1 = crm_element_value(set1, "sequential");
 867     const char *sequential_2 = crm_element_value(set2, "sequential");
 868 
 869     const char *require_all_s = crm_element_value(set1, "require-all");
 870     bool require_all = require_all_s? crm_is_true(require_all_s) : true;
 871 
 872     enum pe_ordering flags = pe_order_none;
 873 
 874     if (action_1 == NULL) {
 875         action_1 = RSC_START;
 876     }
 877 
 878     if (action_2 == NULL) {
 879         action_2 = RSC_START;
 880     }
 881 
 882     if (symmetry == ordering_symmetric_inverse) {
 883         action_1 = invert_action(action_1);
 884         action_2 = invert_action(action_2);
 885     }
 886 
 887     if (pcmk__str_eq(RSC_STOP, action_1, pcmk__str_casei)
 888         || pcmk__str_eq(RSC_DEMOTE, action_1, pcmk__str_casei)) {
 889         /* Assuming: A -> ( B || C) -> D
 890          * The one-or-more logic only applies during the start/promote phase.
 891          * During shutdown neither B nor can shutdown until D is down, so simply
 892          * turn require_all back on.
 893          */
 894         require_all = true;
 895     }
 896 
 897     // @TODO is action_2 correct here?
 898     flags = ordering_flags_for_kind(kind, action_2, symmetry);
 899 
 900     /* If we have an unordered set1, whether it is sequential or not is
 901      * irrelevant in regards to set2.
 902      */
 903     if (!require_all) {
 904         char *task = crm_strdup_printf(CRM_OP_RELAXED_SET ":%s", ID(set1));
 905         pe_action_t *unordered_action = get_pseudo_op(task, data_set);
 906 
 907         free(task);
 908         pe__set_action_flags(unordered_action, pe_action_requires_any);
 909 
 910         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 911              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 912 
 913             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 914 
 915             /* Add an ordering constraint between every element in set1 and the
 916              * pseudo action. If any action in set1 is runnable the pseudo
 917              * action will be runnable.
 918              */
 919             pcmk__new_ordering(rsc_1, pcmk__op_key(rsc_1->id, action_1, 0),
 920                                NULL, NULL, NULL, unordered_action,
 921                                pe_order_one_or_more|pe_order_implies_then_printed,
 922                                data_set);
 923         }
 924         for (xml_rsc_2 = first_named_child(set2, XML_TAG_RESOURCE_REF);
 925              xml_rsc_2 != NULL; xml_rsc_2 = crm_next_same_xml(xml_rsc_2)) {
 926 
 927             EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc_2));
 928 
 929             /* Add an ordering constraint between the pseudo-action and every
 930              * element in set2. If the pseudo-action is runnable, every action
 931              * in set2 will be runnable.
 932              */
 933             pcmk__new_ordering(NULL, NULL, unordered_action,
 934                                rsc_2, pcmk__op_key(rsc_2->id, action_2, 0),
 935                                NULL, flags|pe_order_runnable_left, data_set);
 936         }
 937 
 938         return pcmk_rc_ok;
 939     }
 940 
 941     if (crm_is_true(sequential_1)) {
 942         if (symmetry == ordering_symmetric_inverse) {
 943             // Get the first one
 944             xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 945             if (xml_rsc != NULL) {
 946                 EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 947             }
 948 
 949         } else {
 950             // Get the last one
 951             const char *rid = NULL;
 952 
 953             for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 954                  xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 955 
 956                 rid = ID(xml_rsc);
 957             }
 958             EXPAND_CONSTRAINT_IDREF(id, rsc_1, rid);
 959         }
 960     }
 961 
 962     if (crm_is_true(sequential_2)) {
 963         if (symmetry == ordering_symmetric_inverse) {
 964             // Get the last one
 965             const char *rid = NULL;
 966 
 967             for (xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 968                  xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 969 
 970                 rid = ID(xml_rsc);
 971             }
 972             EXPAND_CONSTRAINT_IDREF(id, rsc_2, rid);
 973 
 974         } else {
 975             // Get the first one
 976             xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 977             if (xml_rsc != NULL) {
 978                 EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc));
 979             }
 980         }
 981     }
 982 
 983     if ((rsc_1 != NULL) && (rsc_2 != NULL)) {
 984         pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2, flags,
 985                                      data_set);
 986 
 987     } else if (rsc_1 != NULL) {
 988         for (xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 989              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 990 
 991             EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc));
 992             pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2,
 993                                          flags, data_set);
 994         }
 995 
 996     } else if (rsc_2 != NULL) {
 997         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 998              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 999 
1000             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
1001             pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2,
1002                                          flags, data_set);
1003         }
1004 
1005     } else {
1006         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
1007              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
1008 
1009             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
1010 
1011             for (xmlNode *xml_rsc_2 = first_named_child(set2, XML_TAG_RESOURCE_REF);
1012                  xml_rsc_2 != NULL; xml_rsc_2 = crm_next_same_xml(xml_rsc_2)) {
1013 
1014                 EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc_2));
1015                 pcmk__order_resource_actions(rsc_1, action_1, rsc_2,
1016                                              action_2, flags, data_set);
1017             }
1018         }
1019     }
1020 
1021     return pcmk_rc_ok;
1022 }
1023 
1024 /*!
1025  * \internal
1026  * \brief If an ordering constraint uses resource tags, expand them
1027  *
1028  * \param[in]  xml_obj       Ordering constraint XML
1029  * \param[out] expanded_xml  Equivalent XML with tags expanded
1030  * \param[in]  data_set      Cluster working set
1031  *
1032  * \return Standard Pacemaker return code (specifically, pcmk_rc_ok on success,
1033  *         and pcmk_rc_schema_validation on invalid configuration)
1034  */
1035 static int
1036 unpack_order_tags(xmlNode *xml_obj, xmlNode **expanded_xml,
     /* [previous][next][first][last][top][bottom][index][help] */
1037                   pe_working_set_t *data_set)
1038 {
1039     const char *id_first = NULL;
1040     const char *id_then = NULL;
1041     const char *action_first = NULL;
1042     const char *action_then = NULL;
1043 
1044     pe_resource_t *rsc_first = NULL;
1045     pe_resource_t *rsc_then = NULL;
1046     pe_tag_t *tag_first = NULL;
1047     pe_tag_t *tag_then = NULL;
1048 
1049     xmlNode *rsc_set_first = NULL;
1050     xmlNode *rsc_set_then = NULL;
1051     bool any_sets = false;
1052 
1053     // Check whether there are any resource sets with template or tag references
1054     *expanded_xml = pcmk__expand_tags_in_sets(xml_obj, data_set);
1055     if (*expanded_xml != NULL) {
1056         crm_log_xml_trace(*expanded_xml, "Expanded rsc_order");
1057         return pcmk_rc_ok;
1058     }
1059 
1060     id_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST);
1061     id_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN);
1062     if ((id_first == NULL) || (id_then == NULL)) {
1063         return pcmk_rc_ok;
1064     }
1065 
1066     if (!pcmk__valid_resource_or_tag(data_set, id_first, &rsc_first,
1067                                      &tag_first)) {
1068         pcmk__config_err("Ignoring constraint '%s' because '%s' is not a "
1069                          "valid resource or tag", ID(xml_obj), id_first);
1070         return pcmk_rc_schema_validation;
1071     }
1072 
1073     if (!pcmk__valid_resource_or_tag(data_set, id_then, &rsc_then, &tag_then)) {
1074         pcmk__config_err("Ignoring constraint '%s' because '%s' is not a "
1075                          "valid resource or tag", ID(xml_obj), id_then);
1076         return pcmk_rc_schema_validation;
1077     }
1078 
1079     if ((rsc_first != NULL) && (rsc_then != NULL)) {
1080         // Neither side references a template or tag
1081         return pcmk_rc_ok;
1082     }
1083 
1084     action_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST_ACTION);
1085     action_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN_ACTION);
1086 
1087     *expanded_xml = copy_xml(xml_obj);
1088 
1089     // Convert template/tag reference in "first" into resource_set under constraint
1090     if (!pcmk__tag_to_set(*expanded_xml, &rsc_set_first, XML_ORDER_ATTR_FIRST,
1091                           true, data_set)) {
1092         free_xml(*expanded_xml);
1093         *expanded_xml = NULL;
1094         return pcmk_rc_schema_validation;
1095     }
1096 
1097     if (rsc_set_first != NULL) {
1098         if (action_first != NULL) {
1099             // Move "first-action" into converted resource_set as "action"
1100             crm_xml_add(rsc_set_first, "action", action_first);
1101             xml_remove_prop(*expanded_xml, XML_ORDER_ATTR_FIRST_ACTION);
1102         }
1103         any_sets = true;
1104     }
1105 
1106     // Convert template/tag reference in "then" into resource_set under constraint
1107     if (!pcmk__tag_to_set(*expanded_xml, &rsc_set_then, XML_ORDER_ATTR_THEN,
1108                           true, data_set)) {
1109         free_xml(*expanded_xml);
1110         *expanded_xml = NULL;
1111         return pcmk_rc_schema_validation;
1112     }
1113 
1114     if (rsc_set_then != NULL) {
1115         if (action_then != NULL) {
1116             // Move "then-action" into converted resource_set as "action"
1117             crm_xml_add(rsc_set_then, "action", action_then);
1118             xml_remove_prop(*expanded_xml, XML_ORDER_ATTR_THEN_ACTION);
1119         }
1120         any_sets = true;
1121     }
1122 
1123     if (any_sets) {
1124         crm_log_xml_trace(*expanded_xml, "Expanded rsc_order");
1125     } else {
1126         free_xml(*expanded_xml);
1127         *expanded_xml = NULL;
1128     }
1129 
1130     return pcmk_rc_ok;
1131 }
1132 
1133 /*!
1134  * \internal
1135  * \brief Unpack ordering constraint XML
1136  *
1137  * \param[in]     xml_obj   Ordering constraint XML to unpack
1138  * \param[in,out] data_set  Cluster working set
1139  */
1140 void
1141 pcmk__unpack_ordering(xmlNode *xml_obj, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1142 {
1143     xmlNode *set = NULL;
1144     xmlNode *last = NULL;
1145 
1146     xmlNode *orig_xml = NULL;
1147     xmlNode *expanded_xml = NULL;
1148 
1149     const char *id = crm_element_value(xml_obj, XML_ATTR_ID);
1150     const char *invert = crm_element_value(xml_obj, XML_CONS_ATTR_SYMMETRICAL);
1151     enum pe_order_kind kind = get_ordering_type(xml_obj);
1152 
1153     enum ordering_symmetry symmetry = get_ordering_symmetry(xml_obj, kind,
1154                                                             NULL);
1155 
1156     // Expand any resource tags in the constraint XML
1157     if (unpack_order_tags(xml_obj, &expanded_xml, data_set) != pcmk_rc_ok) {
1158         return;
1159     }
1160     if (expanded_xml != NULL) {
1161         orig_xml = xml_obj;
1162         xml_obj = expanded_xml;
1163     }
1164 
1165     // If the constraint has resource sets, unpack them
1166     for (set = first_named_child(xml_obj, XML_CONS_TAG_RSC_SET);
1167          set != NULL; set = crm_next_same_xml(set)) {
1168 
1169         set = expand_idref(set, data_set->input);
1170         if ((set == NULL) // Configuration error, message already logged
1171             || (unpack_order_set(set, kind, invert, data_set) != pcmk_rc_ok)) {
1172 
1173             if (expanded_xml != NULL) {
1174                 free_xml(expanded_xml);
1175             }
1176             return;
1177         }
1178 
1179         if (last != NULL) {
1180 
1181             if (order_rsc_sets(id, last, set, kind, data_set,
1182                                symmetry) != pcmk_rc_ok) {
1183                 if (expanded_xml != NULL) {
1184                     free_xml(expanded_xml);
1185                 }
1186                 return;
1187             }
1188 
1189             if ((symmetry == ordering_symmetric)
1190                 && (order_rsc_sets(id, set, last, kind, data_set,
1191                                    ordering_symmetric_inverse) != pcmk_rc_ok)) {
1192                 if (expanded_xml != NULL) {
1193                     free_xml(expanded_xml);
1194                 }
1195                 return;
1196             }
1197 
1198         }
1199         last = set;
1200     }
1201 
1202     if (expanded_xml) {
1203         free_xml(expanded_xml);
1204         xml_obj = orig_xml;
1205     }
1206 
1207     // If the constraint has no resource sets, unpack it as a simple ordering
1208     if (last == NULL) {
1209         return unpack_simple_rsc_order(xml_obj, data_set);
1210     }
1211 }
1212 
1213 static bool
1214 ordering_is_invalid(pe_action_t *action, pe_action_wrapper_t *input)
     /* [previous][next][first][last][top][bottom][index][help] */
1215 {
1216     /* Prevent user-defined ordering constraints between resources
1217      * running in a guest node and the resource that defines that node.
1218      */
1219     if (!pcmk_is_set(input->type, pe_order_preserve)
1220         && (input->action->rsc != NULL)
1221         && pcmk__rsc_corresponds_to_guest(action->rsc, input->action->node)) {
1222 
1223         crm_warn("Invalid ordering constraint between %s and %s",
1224                  input->action->rsc->id, action->rsc->id);
1225         return true;
1226     }
1227 
1228     /* If there's an order like
1229      * "rscB_stop node2"-> "load_stopped_node2" -> "rscA_migrate_to node1"
1230      *
1231      * then rscA is being migrated from node1 to node2, while rscB is being
1232      * migrated from node2 to node1. If there would be a graph loop,
1233      * break the order "load_stopped_node2" -> "rscA_migrate_to node1".
1234      */
1235     if ((input->type == pe_order_load) && action->rsc
1236         && pcmk__str_eq(action->task, RSC_MIGRATE, pcmk__str_casei)
1237         && pcmk__graph_has_loop(action, action, input)) {
1238         return true;
1239     }
1240 
1241     return false;
1242 }
1243 
1244 void
1245 pcmk__disable_invalid_orderings(pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1246 {
1247     for (GList *iter = data_set->actions; iter != NULL; iter = iter->next) {
1248         pe_action_t *action = (pe_action_t *) iter->data;
1249         pe_action_wrapper_t *input = NULL;
1250 
1251         for (GList *input_iter = action->actions_before;
1252              input_iter != NULL; input_iter = input_iter->next) {
1253 
1254             input = (pe_action_wrapper_t *) input_iter->data;
1255             if (ordering_is_invalid(action, input)) {
1256                 input->type = pe_order_none;
1257             }
1258         }
1259     }
1260 }
1261 
1262 /*!
1263  * \internal
1264  * \brief Order stops on a node before the node's shutdown
1265  *
1266  * \param[in] node         Node being shut down
1267  * \param[in] shutdown_op  Shutdown action for node
1268  * \param[in] data_set     Cluster working set
1269  */
1270 void
1271 pcmk__order_stops_before_shutdown(pe_node_t *node, pe_action_t *shutdown_op,
     /* [previous][next][first][last][top][bottom][index][help] */
1272                                   pe_working_set_t *data_set)
1273 {
1274     for (GList *iter = data_set->actions; iter != NULL; iter = iter->next) {
1275         pe_action_t *action = (pe_action_t *) iter->data;
1276 
1277         // Only stops on the node shutting down are relevant
1278         if ((action->rsc == NULL) || (action->node == NULL)
1279             || (action->node->details != node->details)
1280             || !pcmk__str_eq(action->task, RSC_STOP, pcmk__str_casei)) {
1281             continue;
1282         }
1283 
1284         // Resources and nodes in maintenance mode won't be touched
1285 
1286         if (pcmk_is_set(action->rsc->flags, pe_rsc_maintenance)) {
1287             pe_rsc_trace(action->rsc,
1288                          "Not ordering %s before %s shutdown because "
1289                          "resource in maintenance mode",
1290                          action->uuid, node->details->uname);
1291             continue;
1292 
1293         } else if (node->details->maintenance) {
1294             pe_rsc_trace(action->rsc,
1295                          "Not ordering %s before %s shutdown because "
1296                          "node in maintenance mode",
1297                          action->uuid, node->details->uname);
1298             continue;
1299         }
1300 
1301         /* Don't touch a resource that is unmanaged or blocked, to avoid
1302          * blocking the shutdown (though if another action depends on this one,
1303          * we may still end up blocking)
1304          */
1305         if (!pcmk_any_flags_set(action->rsc->flags,
1306                                 pe_rsc_managed|pe_rsc_block)) {
1307             pe_rsc_trace(action->rsc,
1308                          "Not ordering %s before %s shutdown because "
1309                          "resource is unmanaged or blocked",
1310                          action->uuid, node->details->uname);
1311             continue;
1312         }
1313 
1314         pe_rsc_trace(action->rsc, "Ordering %s before %s shutdown",
1315                      action->uuid, node->details->uname);
1316         pe__clear_action_flags(action, pe_action_optional);
1317         pcmk__new_ordering(action->rsc, NULL, action, NULL,
1318                            strdup(CRM_OP_SHUTDOWN), shutdown_op,
1319                            pe_order_optional|pe_order_runnable_left, data_set);
1320     }
1321 }
1322 
1323 /*!
1324  * \brief Find resource actions matching directly or as child
1325  *
1326  * \param[in] rsc           Resource to check
1327  * \param[in] original_key  Action key to search for (possibly referencing
1328  *                          parent of \rsc)
1329  *
1330  * \return Newly allocated list of matching actions
1331  * \note It is the caller's responsibility to free the result with g_list_free()
1332  */
1333 static GList *
1334 find_actions_by_task(pe_resource_t *rsc, const char *original_key)
     /* [previous][next][first][last][top][bottom][index][help] */
1335 {
1336     // Search under given task key directly
1337     GList *list = find_actions(rsc->actions, original_key, NULL);
1338 
1339     if (list == NULL) {
1340         // Search again using this resource's ID
1341         char *key = NULL;
1342         char *task = NULL;
1343         guint interval_ms = 0;
1344 
1345         if (parse_op_key(original_key, NULL, &task, &interval_ms)) {
1346             key = pcmk__op_key(rsc->id, task, interval_ms);
1347             list = find_actions(rsc->actions, key, NULL);
1348             free(key);
1349             free(task);
1350         } else {
1351             crm_err("Invalid operation key (bug?): %s", original_key);
1352         }
1353     }
1354     return list;
1355 }
1356 
1357 static void
1358 rsc_order_then(pe_action_t *lh_action, pe_resource_t *rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
1359                pe__ordering_t *order)
1360 {
1361     GList *rh_actions = NULL;
1362     pe_action_t *rh_action = NULL;
1363     enum pe_ordering type;
1364 
1365     CRM_CHECK(rsc != NULL, return);
1366     CRM_CHECK(order != NULL, return);
1367 
1368     type = order->type;
1369     rh_action = order->rh_action;
1370     crm_trace("Applying ordering constraint %d (then: %s)", order->id, rsc->id);
1371 
1372     if (rh_action != NULL) {
1373         rh_actions = g_list_prepend(NULL, rh_action);
1374 
1375     } else if (rsc != NULL) {
1376         rh_actions = find_actions_by_task(rsc, order->rh_action_task);
1377     }
1378 
1379     if (rh_actions == NULL) {
1380         pe_rsc_trace(rsc,
1381                      "Ignoring constraint %d: then (%s for %s) not found",
1382                      order->id, order->rh_action_task, rsc->id);
1383         return;
1384     }
1385 
1386     if ((lh_action != NULL) && (lh_action->rsc == rsc)
1387         && pcmk_is_set(lh_action->flags, pe_action_dangle)) {
1388 
1389         pe_rsc_trace(rsc, "Detected dangling operation %s -> %s",
1390                      lh_action->uuid, order->rh_action_task);
1391         pe__clear_order_flags(type, pe_order_implies_then);
1392     }
1393 
1394     for (GList *gIter = rh_actions; gIter != NULL; gIter = gIter->next) {
1395         pe_action_t *rh_action_iter = (pe_action_t *) gIter->data;
1396 
1397         if (lh_action) {
1398             order_actions(lh_action, rh_action_iter, type);
1399 
1400         } else if (type & pe_order_implies_then) {
1401             pe__clear_action_flags(rh_action_iter, pe_action_runnable);
1402             crm_warn("Unrunnable %s 0x%.6x", rh_action_iter->uuid, type);
1403         } else {
1404             crm_warn("neither %s 0x%.6x", rh_action_iter->uuid, type);
1405         }
1406     }
1407 
1408     g_list_free(rh_actions);
1409 }
1410 
1411 static void
1412 rsc_order_first(pe_resource_t *lh_rsc, pe__ordering_t *order,
     /* [previous][next][first][last][top][bottom][index][help] */
1413                 pe_working_set_t *data_set)
1414 {
1415     GList *lh_actions = NULL;
1416     pe_action_t *lh_action = order->lh_action;
1417     pe_resource_t *rh_rsc = order->rh_rsc;
1418 
1419     CRM_ASSERT(lh_rsc != NULL);
1420     pe_rsc_trace(lh_rsc, "Applying ordering constraint %d (first: %s)",
1421                  order->id, lh_rsc->id);
1422 
1423     if (lh_action != NULL) {
1424         lh_actions = g_list_prepend(NULL, lh_action);
1425 
1426     } else {
1427         lh_actions = find_actions_by_task(lh_rsc, order->lh_action_task);
1428     }
1429 
1430     if ((lh_actions == NULL) && (lh_rsc == rh_rsc)) {
1431         pe_rsc_trace(lh_rsc,
1432                      "Ignoring constraint %d: first (%s for %s) not found",
1433                      order->id, order->lh_action_task, lh_rsc->id);
1434 
1435     } else if (lh_actions == NULL) {
1436         char *key = NULL;
1437         char *op_type = NULL;
1438         guint interval_ms = 0;
1439 
1440         parse_op_key(order->lh_action_task, NULL, &op_type, &interval_ms);
1441         key = pcmk__op_key(lh_rsc->id, op_type, interval_ms);
1442 
1443         if ((lh_rsc->fns->state(lh_rsc, TRUE) == RSC_ROLE_STOPPED)
1444             && pcmk__str_eq(op_type, RSC_STOP, pcmk__str_casei)) {
1445             free(key);
1446             pe_rsc_trace(lh_rsc,
1447                          "Ignoring constraint %d: first (%s for %s) not found",
1448                          order->id, order->lh_action_task, lh_rsc->id);
1449 
1450         } else if ((lh_rsc->fns->state(lh_rsc, TRUE) == RSC_ROLE_UNPROMOTED)
1451                    && pcmk__str_eq(op_type, RSC_DEMOTE, pcmk__str_casei)) {
1452             free(key);
1453             pe_rsc_trace(lh_rsc,
1454                          "Ignoring constraint %d: first (%s for %s) not found",
1455                          order->id, order->lh_action_task, lh_rsc->id);
1456 
1457         } else {
1458             pe_rsc_trace(lh_rsc,
1459                          "Creating first (%s for %s) for constraint %d ",
1460                          order->lh_action_task, lh_rsc->id, order->id);
1461             lh_action = custom_action(lh_rsc, key, op_type, NULL, TRUE, TRUE, data_set);
1462             lh_actions = g_list_prepend(NULL, lh_action);
1463         }
1464 
1465         free(op_type);
1466     }
1467 
1468     if (rh_rsc == NULL) {
1469         if (order->rh_action == NULL) {
1470             pe_rsc_trace(lh_rsc, "Ignoring constraint %d: then not found",
1471                          order->id);
1472             return;
1473         }
1474         rh_rsc = order->rh_action->rsc;
1475     }
1476     for (GList *gIter = lh_actions; gIter != NULL; gIter = gIter->next) {
1477         lh_action = (pe_action_t *) gIter->data;
1478 
1479         if (rh_rsc == NULL) {
1480             order_actions(lh_action, order->rh_action, order->type);
1481 
1482         } else {
1483             rsc_order_then(lh_action, rh_rsc, order);
1484         }
1485     }
1486 
1487     g_list_free(lh_actions);
1488 }
1489 
1490 void
1491 pcmk__apply_orderings(pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1492 {
1493     crm_trace("Applying ordering constraints");
1494 
1495     /* Don't ask me why, but apparently they need to be processed in
1496      * the order they were created in... go figure
1497      *
1498      * Also g_list_append() has horrendous performance characteristics
1499      * So we need to use g_list_prepend() and then reverse the list here
1500      */
1501     data_set->ordering_constraints = g_list_reverse(data_set->ordering_constraints);
1502 
1503     for (GList *gIter = data_set->ordering_constraints;
1504          gIter != NULL; gIter = gIter->next) {
1505 
1506         pe__ordering_t *order = gIter->data;
1507         pe_resource_t *rsc = order->lh_rsc;
1508 
1509         if (rsc != NULL) {
1510             rsc_order_first(rsc, order, data_set);
1511             continue;
1512         }
1513 
1514         rsc = order->rh_rsc;
1515         if (rsc != NULL) {
1516             rsc_order_then(order->lh_action, rsc, order);
1517 
1518         } else {
1519             crm_trace("Applying ordering constraint %d (non-resource actions)",
1520                       order->id);
1521             order_actions(order->lh_action, order->rh_action, order->type);
1522         }
1523     }
1524 
1525     g_list_foreach(data_set->actions, (GFunc) pcmk__block_colocated_starts,
1526                    data_set);
1527 
1528     crm_trace("Ordering probes");
1529     pcmk__order_probes(data_set);
1530 
1531     crm_trace("Updating %d actions", g_list_length(data_set->actions));
1532     g_list_foreach(data_set->actions, (GFunc) update_action, data_set);
1533 
1534     pcmk__disable_invalid_orderings(data_set);
1535 }

/* [previous][next][first][last][top][bottom][index][help] */