root/lib/pacemaker/pcmk_sched_ordering.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. invert_action
  2. get_ordering_type
  3. get_ordering_symmetry
  4. ordering_flags_for_kind
  5. get_ordering_resource
  6. get_minimum_first_instances
  7. clone_min_ordering
  8. inverse_ordering
  9. unpack_simple_rsc_order
  10. pcmk__new_ordering
  11. unpack_order_set
  12. order_rsc_sets
  13. unpack_order_tags
  14. pcmk__unpack_ordering
  15. ordering_is_invalid
  16. pcmk__disable_invalid_orderings
  17. pcmk__order_stops_before_shutdown
  18. find_actions_by_task
  19. order_resource_actions_after
  20. rsc_order_first
  21. pcmk__apply_orderings
  22. pcmk__order_after_each
  23. pcmk__promotable_restart_ordering

   1 /*
   2  * Copyright 2004-2023 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU General Public License version 2
   7  * or later (GPLv2+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <inttypes.h>               // PRIx32
  13 #include <stdbool.h>
  14 #include <glib.h>
  15 
  16 #include <crm/crm.h>
  17 #include <pacemaker-internal.h>
  18 #include "libpacemaker_private.h"
  19 
  20 enum pe_order_kind {
  21     pe_order_kind_optional,
  22     pe_order_kind_mandatory,
  23     pe_order_kind_serialize,
  24 };
  25 
  26 enum ordering_symmetry {
  27     ordering_asymmetric,        // the only relation in an asymmetric ordering
  28     ordering_symmetric,         // the normal relation in a symmetric ordering
  29     ordering_symmetric_inverse, // the inverse relation in a symmetric ordering
  30 };
  31 
  32 #define EXPAND_CONSTRAINT_IDREF(__set, __rsc, __name) do {                      \
  33         __rsc = pcmk__find_constraint_resource(data_set->resources, __name);    \
  34         if (__rsc == NULL) {                                                    \
  35             pcmk__config_err("%s: No resource found for %s", __set, __name);    \
  36             return pcmk_rc_unpack_error;                                        \
  37         }                                                                       \
  38     } while (0)
  39 
  40 static const char *
  41 invert_action(const char *action)
     /* [previous][next][first][last][top][bottom][index][help] */
  42 {
  43     if (pcmk__str_eq(action, RSC_START, pcmk__str_casei)) {
  44         return RSC_STOP;
  45 
  46     } else if (pcmk__str_eq(action, RSC_STOP, pcmk__str_casei)) {
  47         return RSC_START;
  48 
  49     } else if (pcmk__str_eq(action, RSC_PROMOTE, pcmk__str_casei)) {
  50         return RSC_DEMOTE;
  51 
  52     } else if (pcmk__str_eq(action, RSC_DEMOTE, pcmk__str_casei)) {
  53         return RSC_PROMOTE;
  54 
  55     } else if (pcmk__str_eq(action, RSC_PROMOTED, pcmk__str_casei)) {
  56         return RSC_DEMOTED;
  57 
  58     } else if (pcmk__str_eq(action, RSC_DEMOTED, pcmk__str_casei)) {
  59         return RSC_PROMOTED;
  60 
  61     } else if (pcmk__str_eq(action, RSC_STARTED, pcmk__str_casei)) {
  62         return RSC_STOPPED;
  63 
  64     } else if (pcmk__str_eq(action, RSC_STOPPED, pcmk__str_casei)) {
  65         return RSC_STARTED;
  66     }
  67     crm_warn("Unknown action '%s' specified in order constraint", action);
  68     return NULL;
  69 }
  70 
  71 static enum pe_order_kind
  72 get_ordering_type(const xmlNode *xml_obj)
     /* [previous][next][first][last][top][bottom][index][help] */
  73 {
  74     enum pe_order_kind kind_e = pe_order_kind_mandatory;
  75     const char *kind = crm_element_value(xml_obj, XML_ORDER_ATTR_KIND);
  76 
  77     if (kind == NULL) {
  78         const char *score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
  79 
  80         kind_e = pe_order_kind_mandatory;
  81 
  82         if (score) {
  83             // @COMPAT deprecated informally since 1.0.7, formally since 2.0.1
  84             int score_i = char2score(score);
  85 
  86             if (score_i == 0) {
  87                 kind_e = pe_order_kind_optional;
  88             }
  89             pe_warn_once(pe_wo_order_score,
  90                          "Support for 'score' in rsc_order is deprecated "
  91                          "and will be removed in a future release "
  92                          "(use 'kind' instead)");
  93         }
  94 
  95     } else if (pcmk__str_eq(kind, "Mandatory", pcmk__str_casei)) {
  96         kind_e = pe_order_kind_mandatory;
  97 
  98     } else if (pcmk__str_eq(kind, "Optional", pcmk__str_casei)) {
  99         kind_e = pe_order_kind_optional;
 100 
 101     } else if (pcmk__str_eq(kind, "Serialize", pcmk__str_casei)) {
 102         kind_e = pe_order_kind_serialize;
 103 
 104     } else {
 105         pcmk__config_err("Resetting '" XML_ORDER_ATTR_KIND "' for constraint "
 106                          "%s to 'Mandatory' because '%s' is not valid",
 107                          pcmk__s(ID(xml_obj), "missing ID"), kind);
 108     }
 109     return kind_e;
 110 }
 111 
 112 /*!
 113  * \internal
 114  * \brief Get ordering symmetry from XML
 115  *
 116  * \param[in] xml_obj               Ordering XML
 117  * \param[in] parent_kind           Default ordering kind
 118  * \param[in] parent_symmetrical_s  Parent element's symmetrical setting, if any
 119  *
 120  * \retval ordering_symmetric   Ordering is symmetric
 121  * \retval ordering_asymmetric  Ordering is asymmetric
 122  */
 123 static enum ordering_symmetry
 124 get_ordering_symmetry(const xmlNode *xml_obj, enum pe_order_kind parent_kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 125                       const char *parent_symmetrical_s)
 126 {
 127     int rc = pcmk_rc_ok;
 128     bool symmetric = false;
 129     enum pe_order_kind kind = parent_kind; // Default to parent's kind
 130 
 131     // Check ordering XML for explicit kind
 132     if ((crm_element_value(xml_obj, XML_ORDER_ATTR_KIND) != NULL)
 133         || (crm_element_value(xml_obj, XML_RULE_ATTR_SCORE) != NULL)) {
 134         kind = get_ordering_type(xml_obj);
 135     }
 136 
 137     // Check ordering XML (and parent) for explicit symmetrical setting
 138     rc = pcmk__xe_get_bool_attr(xml_obj, XML_CONS_ATTR_SYMMETRICAL, &symmetric);
 139 
 140     if (rc != pcmk_rc_ok && parent_symmetrical_s != NULL) {
 141         symmetric = crm_is_true(parent_symmetrical_s);
 142         rc = pcmk_rc_ok;
 143     }
 144 
 145     if (rc == pcmk_rc_ok) {
 146         if (symmetric) {
 147             if (kind == pe_order_kind_serialize) {
 148                 pcmk__config_warn("Ignoring " XML_CONS_ATTR_SYMMETRICAL
 149                                   " for '%s' because not valid with "
 150                                   XML_ORDER_ATTR_KIND " of 'Serialize'",
 151                                   ID(xml_obj));
 152             } else {
 153                 return ordering_symmetric;
 154             }
 155         }
 156         return ordering_asymmetric;
 157     }
 158 
 159     // Use default symmetry
 160     if (kind == pe_order_kind_serialize) {
 161         return ordering_asymmetric;
 162     }
 163     return ordering_symmetric;
 164 }
 165 
 166 /*!
 167  * \internal
 168  * \brief Get ordering flags appropriate to ordering kind
 169  *
 170  * \param[in] kind      Ordering kind
 171  * \param[in] first     Action name for 'first' action
 172  * \param[in] symmetry  This ordering's symmetry role
 173  *
 174  * \return Minimal ordering flags appropriate to \p kind
 175  */
 176 static uint32_t
 177 ordering_flags_for_kind(enum pe_order_kind kind, const char *first,
     /* [previous][next][first][last][top][bottom][index][help] */
 178                         enum ordering_symmetry symmetry)
 179 {
 180     uint32_t flags = pe_order_none; // so we trace-log all flags set
 181 
 182     pe__set_order_flags(flags, pe_order_optional);
 183 
 184     switch (kind) {
 185         case pe_order_kind_optional:
 186             break;
 187 
 188         case pe_order_kind_serialize:
 189             pe__set_order_flags(flags, pe_order_serialize_only);
 190             break;
 191 
 192         case pe_order_kind_mandatory:
 193             switch (symmetry) {
 194                 case ordering_asymmetric:
 195                     pe__set_order_flags(flags, pe_order_asymmetrical);
 196                     break;
 197 
 198                 case ordering_symmetric:
 199                     pe__set_order_flags(flags, pe_order_implies_then);
 200                     if (pcmk__strcase_any_of(first, RSC_START, RSC_PROMOTE,
 201                                              NULL)) {
 202                         pe__set_order_flags(flags, pe_order_runnable_left);
 203                     }
 204                     break;
 205 
 206                 case ordering_symmetric_inverse:
 207                     pe__set_order_flags(flags, pe_order_implies_first);
 208                     break;
 209             }
 210             break;
 211     }
 212     return flags;
 213 }
 214 
 215 /*!
 216  * \internal
 217  * \brief Find resource corresponding to ID specified in ordering
 218  *
 219  * \param[in] xml            Ordering XML
 220  * \param[in] resource_attr  XML attribute name for resource ID
 221  * \param[in] instance_attr  XML attribute name for instance number.
 222  *                           This option is deprecated and will be removed in a
 223  *                           future release.
 224  * \param[in] data_set       Cluster working set
 225  *
 226  * \return Resource corresponding to \p id, or NULL if none
 227  */
 228 static pe_resource_t *
 229 get_ordering_resource(const xmlNode *xml, const char *resource_attr,
     /* [previous][next][first][last][top][bottom][index][help] */
 230                       const char *instance_attr,
 231                       const pe_working_set_t *data_set)
 232 {
 233     // @COMPAT: instance_attr and instance_id variables deprecated since 2.1.5
 234     pe_resource_t *rsc = NULL;
 235     const char *rsc_id = crm_element_value(xml, resource_attr);
 236     const char *instance_id = crm_element_value(xml, instance_attr);
 237 
 238     if (rsc_id == NULL) {
 239         pcmk__config_err("Ignoring constraint '%s' without %s",
 240                          ID(xml), resource_attr);
 241         return NULL;
 242     }
 243 
 244     rsc = pcmk__find_constraint_resource(data_set->resources, rsc_id);
 245     if (rsc == NULL) {
 246         pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 247                          "does not exist", ID(xml), rsc_id);
 248         return NULL;
 249     }
 250 
 251     if (instance_id != NULL) {
 252         pe_warn_once(pe_wo_order_inst,
 253                      "Support for " XML_ORDER_ATTR_FIRST_INSTANCE " and "
 254                      XML_ORDER_ATTR_THEN_INSTANCE " is deprecated and will be "
 255                      "removed in a future release.");
 256 
 257         if (!pe_rsc_is_clone(rsc)) {
 258             pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 259                              "is not a clone but instance '%s' was requested",
 260                              ID(xml), rsc_id, instance_id);
 261             return NULL;
 262         }
 263         rsc = find_clone_instance(rsc, instance_id);
 264         if (rsc == NULL) {
 265             pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 266                              "does not have an instance '%s'",
 267                              "'%s'", ID(xml), rsc_id, instance_id);
 268             return NULL;
 269         }
 270     }
 271     return rsc;
 272 }
 273 
 274 /*!
 275  * \internal
 276  * \brief Determine minimum number of 'first' instances required in ordering
 277  *
 278  * \param[in] rsc  'First' resource in ordering
 279  * \param[in] xml  Ordering XML
 280  *
 281  * \return Minimum 'first' instances required (or 0 if not applicable)
 282  */
 283 static int
 284 get_minimum_first_instances(const pe_resource_t *rsc, const xmlNode *xml)
     /* [previous][next][first][last][top][bottom][index][help] */
 285 {
 286     const char *clone_min = NULL;
 287     bool require_all = false;
 288 
 289     if (!pe_rsc_is_clone(rsc)) {
 290         return 0;
 291     }
 292 
 293     clone_min = g_hash_table_lookup(rsc->meta,
 294                                     XML_RSC_ATTR_INCARNATION_MIN);
 295     if (clone_min != NULL) {
 296         int clone_min_int = 0;
 297 
 298         pcmk__scan_min_int(clone_min, &clone_min_int, 0);
 299         return clone_min_int;
 300     }
 301 
 302     /* @COMPAT 1.1.13:
 303      * require-all=false is deprecated equivalent of clone-min=1
 304      */
 305     if (pcmk__xe_get_bool_attr(xml, "require-all", &require_all) != ENODATA) {
 306         pe_warn_once(pe_wo_require_all,
 307                      "Support for require-all in ordering constraints "
 308                      "is deprecated and will be removed in a future release"
 309                      " (use clone-min clone meta-attribute instead)");
 310         if (!require_all) {
 311             return 1;
 312         }
 313     }
 314 
 315     return 0;
 316 }
 317 
 318 /*!
 319  * \internal
 320  * \brief Create orderings for a constraint with clone-min > 0
 321  *
 322  * \param[in]     id            Ordering ID
 323  * \param[in,out] rsc_first     'First' resource in ordering (a clone)
 324  * \param[in]     action_first  'First' action in ordering
 325  * \param[in]     rsc_then      'Then' resource in ordering
 326  * \param[in]     action_then   'Then' action in ordering
 327  * \param[in]     flags         Ordering flags
 328  * \param[in]     clone_min     Minimum required instances of 'first'
 329  * \param[in,out] data_set      Cluster working set
 330  */
 331 static void
 332 clone_min_ordering(const char *id,
     /* [previous][next][first][last][top][bottom][index][help] */
 333                    pe_resource_t *rsc_first, const char *action_first,
 334                    pe_resource_t *rsc_then, const char *action_then,
 335                    uint32_t flags, int clone_min, pe_working_set_t *data_set)
 336 {
 337     // Create a pseudo-action for when the minimum instances are active
 338     char *task = crm_strdup_printf(CRM_OP_RELAXED_CLONE ":%s", id);
 339     pe_action_t *clone_min_met = get_pseudo_op(task, data_set);
 340 
 341     free(task);
 342 
 343     /* Require the pseudo-action to have the required number of actions to be
 344      * considered runnable before allowing the pseudo-action to be runnable.
 345      */
 346     clone_min_met->required_runnable_before = clone_min;
 347     pe__set_action_flags(clone_min_met, pe_action_requires_any);
 348 
 349     // Order the actions for each clone instance before the pseudo-action
 350     for (GList *rIter = rsc_first->children; rIter != NULL;
 351          rIter = rIter->next) {
 352 
 353         pe_resource_t *child = rIter->data;
 354 
 355         pcmk__new_ordering(child, pcmk__op_key(child->id, action_first, 0),
 356                            NULL, NULL, NULL, clone_min_met,
 357                            pe_order_one_or_more|pe_order_implies_then_printed,
 358                            data_set);
 359     }
 360 
 361     // Order "then" action after the pseudo-action (if runnable)
 362     pcmk__new_ordering(NULL, NULL, clone_min_met, rsc_then,
 363                        pcmk__op_key(rsc_then->id, action_then, 0),
 364                        NULL, flags|pe_order_runnable_left, data_set);
 365 }
 366 
 367 /*!
 368  * \internal
 369  * \brief Update ordering flags for restart-type=restart
 370  *
 371  * \param[in]     rsc    'Then' resource in ordering
 372  * \param[in]     kind   Ordering kind
 373  * \param[in]     flag   Ordering flag to set (when applicable)
 374  * \param[in,out] flags  Ordering flag set to update
 375  *
 376  * \compat The restart-type resource meta-attribute is deprecated. Eventually,
 377  *         it will be removed, and pe_restart_ignore will be the only behavior,
 378  *         at which time this can just be removed entirely.
 379  */
 380 #define handle_restart_type(rsc, kind, flag, flags) do {        \
 381         if (((kind) == pe_order_kind_optional)                  \
 382             && ((rsc)->restart_type == pe_restart_restart)) {   \
 383             pe__set_order_flags((flags), (flag));               \
 384         }                                                       \
 385     } while (0)
 386 
 387 /*!
 388  * \internal
 389  * \brief Create new ordering for inverse of symmetric constraint
 390  *
 391  * \param[in]     id            Ordering ID (for logging only)
 392  * \param[in]     kind          Ordering kind
 393  * \param[in]     rsc_first     'First' resource in ordering (a clone)
 394  * \param[in]     action_first  'First' action in ordering
 395  * \param[in,out] rsc_then      'Then' resource in ordering
 396  * \param[in]     action_then   'Then' action in ordering
 397  */
 398 static void
 399 inverse_ordering(const char *id, enum pe_order_kind kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 400                  pe_resource_t *rsc_first, const char *action_first,
 401                  pe_resource_t *rsc_then, const char *action_then)
 402 {
 403     action_then = invert_action(action_then);
 404     action_first = invert_action(action_first);
 405     if ((action_then == NULL) || (action_first == NULL)) {
 406         pcmk__config_warn("Cannot invert constraint '%s' "
 407                           "(please specify inverse manually)", id);
 408     } else {
 409         uint32_t flags = ordering_flags_for_kind(kind, action_first,
 410                                                  ordering_symmetric_inverse);
 411 
 412         handle_restart_type(rsc_then, kind, pe_order_implies_first, flags);
 413         pcmk__order_resource_actions(rsc_then, action_then, rsc_first,
 414                                      action_first, flags);
 415     }
 416 }
 417 
 418 static void
 419 unpack_simple_rsc_order(xmlNode *xml_obj, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 420 {
 421     pe_resource_t *rsc_then = NULL;
 422     pe_resource_t *rsc_first = NULL;
 423     int min_required_before = 0;
 424     enum pe_order_kind kind = pe_order_kind_mandatory;
 425     uint32_t cons_weight = pe_order_none;
 426     enum ordering_symmetry symmetry;
 427 
 428     const char *action_then = NULL;
 429     const char *action_first = NULL;
 430     const char *id = NULL;
 431 
 432     CRM_CHECK(xml_obj != NULL, return);
 433 
 434     id = crm_element_value(xml_obj, XML_ATTR_ID);
 435     if (id == NULL) {
 436         pcmk__config_err("Ignoring <%s> constraint without " XML_ATTR_ID,
 437                          crm_element_name(xml_obj));
 438         return;
 439     }
 440 
 441     rsc_first = get_ordering_resource(xml_obj, XML_ORDER_ATTR_FIRST,
 442                                       XML_ORDER_ATTR_FIRST_INSTANCE,
 443                                       data_set);
 444     if (rsc_first == NULL) {
 445         return;
 446     }
 447 
 448     rsc_then = get_ordering_resource(xml_obj, XML_ORDER_ATTR_THEN,
 449                                      XML_ORDER_ATTR_THEN_INSTANCE,
 450                                      data_set);
 451     if (rsc_then == NULL) {
 452         return;
 453     }
 454 
 455     action_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST_ACTION);
 456     if (action_first == NULL) {
 457         action_first = RSC_START;
 458     }
 459 
 460     action_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN_ACTION);
 461     if (action_then == NULL) {
 462         action_then = action_first;
 463     }
 464 
 465     kind = get_ordering_type(xml_obj);
 466 
 467     symmetry = get_ordering_symmetry(xml_obj, kind, NULL);
 468     cons_weight = ordering_flags_for_kind(kind, action_first, symmetry);
 469 
 470     handle_restart_type(rsc_then, kind, pe_order_implies_then, cons_weight);
 471 
 472     /* If there is a minimum number of instances that must be runnable before
 473      * the 'then' action is runnable, we use a pseudo-action for convenience:
 474      * minimum number of clone instances have runnable actions ->
 475      * pseudo-action is runnable -> dependency is runnable.
 476      */
 477     min_required_before = get_minimum_first_instances(rsc_first, xml_obj);
 478     if (min_required_before > 0) {
 479         clone_min_ordering(id, rsc_first, action_first, rsc_then, action_then,
 480                            cons_weight, min_required_before, data_set);
 481     } else {
 482         pcmk__order_resource_actions(rsc_first, action_first, rsc_then,
 483                                      action_then, cons_weight);
 484     }
 485 
 486     if (symmetry == ordering_symmetric) {
 487         inverse_ordering(id, kind, rsc_first, action_first,
 488                          rsc_then, action_then);
 489     }
 490 }
 491 
 492 /*!
 493  * \internal
 494  * \brief Create a new ordering between two actions
 495  *
 496  * \param[in,out] first_rsc          Resource for 'first' action (if NULL and
 497  *                                   \p first_action is a resource action, that
 498  *                                   resource will be used)
 499  * \param[in,out] first_action_task  Action key for 'first' action (if NULL and
 500  *                                   \p first_action is not NULL, its UUID will
 501  *                                   be used)
 502  * \param[in,out] first_action       'first' action (if NULL, \p first_rsc and
 503  *                                   \p first_action_task must be set)
 504  *
 505  * \param[in]     then_rsc           Resource for 'then' action (if NULL and
 506  *                                   \p then_action is a resource action, that
 507  *                                   resource will be used)
 508  * \param[in,out] then_action_task   Action key for 'then' action (if NULL and
 509  *                                   \p then_action is not NULL, its UUID will
 510  *                                   be used)
 511  * \param[in]     then_action        'then' action (if NULL, \p then_rsc and
 512  *                                   \p then_action_task must be set)
 513  *
 514  * \param[in]     flags              Flag set of enum pe_ordering
 515  * \param[in,out] data_set           Cluster working set to add ordering to
 516  *
 517  * \note This function takes ownership of first_action_task and
 518  *       then_action_task, which do not need to be freed by the caller.
 519  */
 520 void
 521 pcmk__new_ordering(pe_resource_t *first_rsc, char *first_action_task,
     /* [previous][next][first][last][top][bottom][index][help] */
 522                    pe_action_t *first_action, pe_resource_t *then_rsc,
 523                    char *then_action_task, pe_action_t *then_action,
 524                    uint32_t flags, pe_working_set_t *data_set)
 525 {
 526     pe__ordering_t *order = NULL;
 527 
 528     // One of action or resource must be specified for each side
 529     CRM_CHECK(((first_action != NULL) || (first_rsc != NULL))
 530               && ((then_action != NULL) || (then_rsc != NULL)),
 531               free(first_action_task); free(then_action_task); return);
 532 
 533     if ((first_rsc == NULL) && (first_action != NULL)) {
 534         first_rsc = first_action->rsc;
 535     }
 536     if ((then_rsc == NULL) && (then_action != NULL)) {
 537         then_rsc = then_action->rsc;
 538     }
 539 
 540     order = calloc(1, sizeof(pe__ordering_t));
 541     CRM_ASSERT(order != NULL);
 542 
 543     order->id = data_set->order_id++;
 544     order->flags = flags;
 545     order->lh_rsc = first_rsc;
 546     order->rh_rsc = then_rsc;
 547     order->lh_action = first_action;
 548     order->rh_action = then_action;
 549     order->lh_action_task = first_action_task;
 550     order->rh_action_task = then_action_task;
 551 
 552     if ((order->lh_action_task == NULL) && (first_action != NULL)) {
 553         order->lh_action_task = strdup(first_action->uuid);
 554     }
 555 
 556     if ((order->rh_action_task == NULL) && (then_action != NULL)) {
 557         order->rh_action_task = strdup(then_action->uuid);
 558     }
 559 
 560     if ((order->lh_rsc == NULL) && (first_action != NULL)) {
 561         order->lh_rsc = first_action->rsc;
 562     }
 563 
 564     if ((order->rh_rsc == NULL) && (then_action != NULL)) {
 565         order->rh_rsc = then_action->rsc;
 566     }
 567 
 568     pe_rsc_trace(first_rsc, "Created ordering %d for %s then %s",
 569                  (data_set->order_id - 1),
 570                  pcmk__s(order->lh_action_task, "an underspecified action"),
 571                  pcmk__s(order->rh_action_task, "an underspecified action"));
 572 
 573     data_set->ordering_constraints = g_list_prepend(data_set->ordering_constraints,
 574                                                     order);
 575     pcmk__order_migration_equivalents(order);
 576 }
 577 
 578 /*!
 579  * \brief Unpack a set in an ordering constraint
 580  *
 581  * \param[in]     set                   Set XML to unpack
 582  * \param[in]     parent_kind           rsc_order XML "kind" attribute
 583  * \param[in]     parent_symmetrical_s  rsc_order XML "symmetrical" attribute
 584  * \param[in,out] data_set              Cluster working set
 585  *
 586  * \return Standard Pacemaker return code
 587  */
 588 static int
 589 unpack_order_set(const xmlNode *set, enum pe_order_kind parent_kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 590                  const char *parent_symmetrical_s, pe_working_set_t *data_set)
 591 {
 592     GList *set_iter = NULL;
 593     GList *resources = NULL;
 594 
 595     pe_resource_t *last = NULL;
 596     pe_resource_t *resource = NULL;
 597 
 598     int local_kind = parent_kind;
 599     bool sequential = false;
 600     uint32_t flags = pe_order_optional;
 601     enum ordering_symmetry symmetry;
 602 
 603     char *key = NULL;
 604     const char *id = ID(set);
 605     const char *action = crm_element_value(set, "action");
 606     const char *sequential_s = crm_element_value(set, "sequential");
 607     const char *kind_s = crm_element_value(set, XML_ORDER_ATTR_KIND);
 608 
 609     if (action == NULL) {
 610         action = RSC_START;
 611     }
 612 
 613     if (kind_s) {
 614         local_kind = get_ordering_type(set);
 615     }
 616     if (sequential_s == NULL) {
 617         sequential_s = "1";
 618     }
 619 
 620     sequential = crm_is_true(sequential_s);
 621 
 622     symmetry = get_ordering_symmetry(set, parent_kind, parent_symmetrical_s);
 623     flags = ordering_flags_for_kind(local_kind, action, symmetry);
 624 
 625     for (const xmlNode *xml_rsc = first_named_child(set, XML_TAG_RESOURCE_REF);
 626          xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 627 
 628         EXPAND_CONSTRAINT_IDREF(id, resource, ID(xml_rsc));
 629         resources = g_list_append(resources, resource);
 630     }
 631 
 632     if (pcmk__list_of_1(resources)) {
 633         crm_trace("Single set: %s", id);
 634         goto done;
 635     }
 636 
 637     set_iter = resources;
 638     while (set_iter != NULL) {
 639         resource = (pe_resource_t *) set_iter->data;
 640         set_iter = set_iter->next;
 641 
 642         key = pcmk__op_key(resource->id, action, 0);
 643 
 644         if (local_kind == pe_order_kind_serialize) {
 645             /* Serialize before everything that comes after */
 646 
 647             for (GList *gIter = set_iter; gIter != NULL; gIter = gIter->next) {
 648                 pe_resource_t *then_rsc = (pe_resource_t *) gIter->data;
 649                 char *then_key = pcmk__op_key(then_rsc->id, action, 0);
 650 
 651                 pcmk__new_ordering(resource, strdup(key), NULL, then_rsc,
 652                                    then_key, NULL, flags, data_set);
 653             }
 654 
 655         } else if (sequential) {
 656             if (last != NULL) {
 657                 pcmk__order_resource_actions(last, action, resource, action,
 658                                              flags);
 659             }
 660             last = resource;
 661         }
 662         free(key);
 663     }
 664 
 665     if (symmetry == ordering_asymmetric) {
 666         goto done;
 667     }
 668 
 669     last = NULL;
 670     action = invert_action(action);
 671 
 672     flags = ordering_flags_for_kind(local_kind, action,
 673                                     ordering_symmetric_inverse);
 674 
 675     set_iter = resources;
 676     while (set_iter != NULL) {
 677         resource = (pe_resource_t *) set_iter->data;
 678         set_iter = set_iter->next;
 679 
 680         if (sequential) {
 681             if (last != NULL) {
 682                 pcmk__order_resource_actions(resource, action, last, action,
 683                                              flags);
 684             }
 685             last = resource;
 686         }
 687     }
 688 
 689   done:
 690     g_list_free(resources);
 691     return pcmk_rc_ok;
 692 }
 693 
 694 /*!
 695  * \brief Order two resource sets relative to each other
 696  *
 697  * \param[in]     id        Ordering ID (for logging)
 698  * \param[in]     set1      First listed set
 699  * \param[in]     set2      Second listed set
 700  * \param[in]     kind      Ordering kind
 701  * \param[in,out] data_set  Cluster working set
 702  * \param[in]     symmetry  Which ordering symmetry applies to this relation
 703  *
 704  * \return Standard Pacemaker return code
 705  */
 706 static int
 707 order_rsc_sets(const char *id, const xmlNode *set1, const xmlNode *set2,
     /* [previous][next][first][last][top][bottom][index][help] */
 708                enum pe_order_kind kind, pe_working_set_t *data_set,
 709                enum ordering_symmetry symmetry)
 710 {
 711 
 712     const xmlNode *xml_rsc = NULL;
 713     const xmlNode *xml_rsc_2 = NULL;
 714 
 715     pe_resource_t *rsc_1 = NULL;
 716     pe_resource_t *rsc_2 = NULL;
 717 
 718     const char *action_1 = crm_element_value(set1, "action");
 719     const char *action_2 = crm_element_value(set2, "action");
 720 
 721     uint32_t flags = pe_order_none;
 722 
 723     bool require_all = true;
 724 
 725     (void) pcmk__xe_get_bool_attr(set1, "require-all", &require_all);
 726 
 727     if (action_1 == NULL) {
 728         action_1 = RSC_START;
 729     }
 730 
 731     if (action_2 == NULL) {
 732         action_2 = RSC_START;
 733     }
 734 
 735     if (symmetry == ordering_symmetric_inverse) {
 736         action_1 = invert_action(action_1);
 737         action_2 = invert_action(action_2);
 738     }
 739 
 740     if (pcmk__str_eq(RSC_STOP, action_1, pcmk__str_casei)
 741         || pcmk__str_eq(RSC_DEMOTE, action_1, pcmk__str_casei)) {
 742         /* Assuming: A -> ( B || C) -> D
 743          * The one-or-more logic only applies during the start/promote phase.
 744          * During shutdown neither B nor can shutdown until D is down, so simply
 745          * turn require_all back on.
 746          */
 747         require_all = true;
 748     }
 749 
 750     flags = ordering_flags_for_kind(kind, action_1, symmetry);
 751 
 752     /* If we have an unordered set1, whether it is sequential or not is
 753      * irrelevant in regards to set2.
 754      */
 755     if (!require_all) {
 756         char *task = crm_strdup_printf(CRM_OP_RELAXED_SET ":%s", ID(set1));
 757         pe_action_t *unordered_action = get_pseudo_op(task, data_set);
 758 
 759         free(task);
 760         pe__set_action_flags(unordered_action, pe_action_requires_any);
 761 
 762         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 763              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 764 
 765             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 766 
 767             /* Add an ordering constraint between every element in set1 and the
 768              * pseudo action. If any action in set1 is runnable the pseudo
 769              * action will be runnable.
 770              */
 771             pcmk__new_ordering(rsc_1, pcmk__op_key(rsc_1->id, action_1, 0),
 772                                NULL, NULL, NULL, unordered_action,
 773                                pe_order_one_or_more|pe_order_implies_then_printed,
 774                                data_set);
 775         }
 776         for (xml_rsc_2 = first_named_child(set2, XML_TAG_RESOURCE_REF);
 777              xml_rsc_2 != NULL; xml_rsc_2 = crm_next_same_xml(xml_rsc_2)) {
 778 
 779             EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc_2));
 780 
 781             /* Add an ordering constraint between the pseudo-action and every
 782              * element in set2. If the pseudo-action is runnable, every action
 783              * in set2 will be runnable.
 784              */
 785             pcmk__new_ordering(NULL, NULL, unordered_action,
 786                                rsc_2, pcmk__op_key(rsc_2->id, action_2, 0),
 787                                NULL, flags|pe_order_runnable_left, data_set);
 788         }
 789 
 790         return pcmk_rc_ok;
 791     }
 792 
 793     if (pcmk__xe_attr_is_true(set1, "sequential")) {
 794         if (symmetry == ordering_symmetric_inverse) {
 795             // Get the first one
 796             xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 797             if (xml_rsc != NULL) {
 798                 EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 799             }
 800 
 801         } else {
 802             // Get the last one
 803             const char *rid = NULL;
 804 
 805             for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 806                  xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 807 
 808                 rid = ID(xml_rsc);
 809             }
 810             EXPAND_CONSTRAINT_IDREF(id, rsc_1, rid);
 811         }
 812     }
 813 
 814     if (pcmk__xe_attr_is_true(set2, "sequential")) {
 815         if (symmetry == ordering_symmetric_inverse) {
 816             // Get the last one
 817             const char *rid = NULL;
 818 
 819             for (xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 820                  xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 821 
 822                 rid = ID(xml_rsc);
 823             }
 824             EXPAND_CONSTRAINT_IDREF(id, rsc_2, rid);
 825 
 826         } else {
 827             // Get the first one
 828             xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 829             if (xml_rsc != NULL) {
 830                 EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc));
 831             }
 832         }
 833     }
 834 
 835     if ((rsc_1 != NULL) && (rsc_2 != NULL)) {
 836         pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2, flags);
 837 
 838     } else if (rsc_1 != NULL) {
 839         for (xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 840              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 841 
 842             EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc));
 843             pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2,
 844                                          flags);
 845         }
 846 
 847     } else if (rsc_2 != NULL) {
 848         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 849              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 850 
 851             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 852             pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2,
 853                                          flags);
 854         }
 855 
 856     } else {
 857         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 858              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 859 
 860             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 861 
 862             for (xmlNode *xml_rsc_2 = first_named_child(set2, XML_TAG_RESOURCE_REF);
 863                  xml_rsc_2 != NULL; xml_rsc_2 = crm_next_same_xml(xml_rsc_2)) {
 864 
 865                 EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc_2));
 866                 pcmk__order_resource_actions(rsc_1, action_1, rsc_2,
 867                                              action_2, flags);
 868             }
 869         }
 870     }
 871 
 872     return pcmk_rc_ok;
 873 }
 874 
 875 /*!
 876  * \internal
 877  * \brief If an ordering constraint uses resource tags, expand them
 878  *
 879  * \param[in,out] xml_obj       Ordering constraint XML
 880  * \param[out]    expanded_xml  Equivalent XML with tags expanded
 881  * \param[in]     data_set      Cluster working set
 882  *
 883  * \return Standard Pacemaker return code (specifically, pcmk_rc_ok on success,
 884  *         and pcmk_rc_unpack_error on invalid configuration)
 885  */
 886 static int
 887 unpack_order_tags(xmlNode *xml_obj, xmlNode **expanded_xml,
     /* [previous][next][first][last][top][bottom][index][help] */
 888                   const pe_working_set_t *data_set)
 889 {
 890     const char *id_first = NULL;
 891     const char *id_then = NULL;
 892     const char *action_first = NULL;
 893     const char *action_then = NULL;
 894 
 895     pe_resource_t *rsc_first = NULL;
 896     pe_resource_t *rsc_then = NULL;
 897     pe_tag_t *tag_first = NULL;
 898     pe_tag_t *tag_then = NULL;
 899 
 900     xmlNode *rsc_set_first = NULL;
 901     xmlNode *rsc_set_then = NULL;
 902     bool any_sets = false;
 903 
 904     // Check whether there are any resource sets with template or tag references
 905     *expanded_xml = pcmk__expand_tags_in_sets(xml_obj, data_set);
 906     if (*expanded_xml != NULL) {
 907         crm_log_xml_trace(*expanded_xml, "Expanded rsc_order");
 908         return pcmk_rc_ok;
 909     }
 910 
 911     id_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST);
 912     id_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN);
 913     if ((id_first == NULL) || (id_then == NULL)) {
 914         return pcmk_rc_ok;
 915     }
 916 
 917     if (!pcmk__valid_resource_or_tag(data_set, id_first, &rsc_first,
 918                                      &tag_first)) {
 919         pcmk__config_err("Ignoring constraint '%s' because '%s' is not a "
 920                          "valid resource or tag", ID(xml_obj), id_first);
 921         return pcmk_rc_unpack_error;
 922     }
 923 
 924     if (!pcmk__valid_resource_or_tag(data_set, id_then, &rsc_then, &tag_then)) {
 925         pcmk__config_err("Ignoring constraint '%s' because '%s' is not a "
 926                          "valid resource or tag", ID(xml_obj), id_then);
 927         return pcmk_rc_unpack_error;
 928     }
 929 
 930     if ((rsc_first != NULL) && (rsc_then != NULL)) {
 931         // Neither side references a template or tag
 932         return pcmk_rc_ok;
 933     }
 934 
 935     action_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST_ACTION);
 936     action_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN_ACTION);
 937 
 938     *expanded_xml = copy_xml(xml_obj);
 939 
 940     // Convert template/tag reference in "first" into resource_set under constraint
 941     if (!pcmk__tag_to_set(*expanded_xml, &rsc_set_first, XML_ORDER_ATTR_FIRST,
 942                           true, data_set)) {
 943         free_xml(*expanded_xml);
 944         *expanded_xml = NULL;
 945         return pcmk_rc_unpack_error;
 946     }
 947 
 948     if (rsc_set_first != NULL) {
 949         if (action_first != NULL) {
 950             // Move "first-action" into converted resource_set as "action"
 951             crm_xml_add(rsc_set_first, "action", action_first);
 952             xml_remove_prop(*expanded_xml, XML_ORDER_ATTR_FIRST_ACTION);
 953         }
 954         any_sets = true;
 955     }
 956 
 957     // Convert template/tag reference in "then" into resource_set under constraint
 958     if (!pcmk__tag_to_set(*expanded_xml, &rsc_set_then, XML_ORDER_ATTR_THEN,
 959                           true, data_set)) {
 960         free_xml(*expanded_xml);
 961         *expanded_xml = NULL;
 962         return pcmk_rc_unpack_error;
 963     }
 964 
 965     if (rsc_set_then != NULL) {
 966         if (action_then != NULL) {
 967             // Move "then-action" into converted resource_set as "action"
 968             crm_xml_add(rsc_set_then, "action", action_then);
 969             xml_remove_prop(*expanded_xml, XML_ORDER_ATTR_THEN_ACTION);
 970         }
 971         any_sets = true;
 972     }
 973 
 974     if (any_sets) {
 975         crm_log_xml_trace(*expanded_xml, "Expanded rsc_order");
 976     } else {
 977         free_xml(*expanded_xml);
 978         *expanded_xml = NULL;
 979     }
 980 
 981     return pcmk_rc_ok;
 982 }
 983 
 984 /*!
 985  * \internal
 986  * \brief Unpack ordering constraint XML
 987  *
 988  * \param[in,out] xml_obj   Ordering constraint XML to unpack
 989  * \param[in,out] data_set  Cluster working set
 990  */
 991 void
 992 pcmk__unpack_ordering(xmlNode *xml_obj, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 993 {
 994     xmlNode *set = NULL;
 995     xmlNode *last = NULL;
 996 
 997     xmlNode *orig_xml = NULL;
 998     xmlNode *expanded_xml = NULL;
 999 
1000     const char *id = crm_element_value(xml_obj, XML_ATTR_ID);
1001     const char *invert = crm_element_value(xml_obj, XML_CONS_ATTR_SYMMETRICAL);
1002     enum pe_order_kind kind = get_ordering_type(xml_obj);
1003 
1004     enum ordering_symmetry symmetry = get_ordering_symmetry(xml_obj, kind,
1005                                                             NULL);
1006 
1007     // Expand any resource tags in the constraint XML
1008     if (unpack_order_tags(xml_obj, &expanded_xml, data_set) != pcmk_rc_ok) {
1009         return;
1010     }
1011     if (expanded_xml != NULL) {
1012         orig_xml = xml_obj;
1013         xml_obj = expanded_xml;
1014     }
1015 
1016     // If the constraint has resource sets, unpack them
1017     for (set = first_named_child(xml_obj, XML_CONS_TAG_RSC_SET);
1018          set != NULL; set = crm_next_same_xml(set)) {
1019 
1020         set = expand_idref(set, data_set->input);
1021         if ((set == NULL) // Configuration error, message already logged
1022             || (unpack_order_set(set, kind, invert, data_set) != pcmk_rc_ok)) {
1023 
1024             if (expanded_xml != NULL) {
1025                 free_xml(expanded_xml);
1026             }
1027             return;
1028         }
1029 
1030         if (last != NULL) {
1031 
1032             if (order_rsc_sets(id, last, set, kind, data_set,
1033                                symmetry) != pcmk_rc_ok) {
1034                 if (expanded_xml != NULL) {
1035                     free_xml(expanded_xml);
1036                 }
1037                 return;
1038             }
1039 
1040             if ((symmetry == ordering_symmetric)
1041                 && (order_rsc_sets(id, set, last, kind, data_set,
1042                                    ordering_symmetric_inverse) != pcmk_rc_ok)) {
1043                 if (expanded_xml != NULL) {
1044                     free_xml(expanded_xml);
1045                 }
1046                 return;
1047             }
1048 
1049         }
1050         last = set;
1051     }
1052 
1053     if (expanded_xml) {
1054         free_xml(expanded_xml);
1055         xml_obj = orig_xml;
1056     }
1057 
1058     // If the constraint has no resource sets, unpack it as a simple ordering
1059     if (last == NULL) {
1060         return unpack_simple_rsc_order(xml_obj, data_set);
1061     }
1062 }
1063 
1064 static bool
1065 ordering_is_invalid(pe_action_t *action, pe_action_wrapper_t *input)
     /* [previous][next][first][last][top][bottom][index][help] */
1066 {
1067     /* Prevent user-defined ordering constraints between resources
1068      * running in a guest node and the resource that defines that node.
1069      */
1070     if (!pcmk_is_set(input->type, pe_order_preserve)
1071         && (input->action->rsc != NULL)
1072         && pcmk__rsc_corresponds_to_guest(action->rsc, input->action->node)) {
1073 
1074         crm_warn("Invalid ordering constraint between %s and %s",
1075                  input->action->rsc->id, action->rsc->id);
1076         return true;
1077     }
1078 
1079     /* If there's an order like
1080      * "rscB_stop node2"-> "load_stopped_node2" -> "rscA_migrate_to node1"
1081      *
1082      * then rscA is being migrated from node1 to node2, while rscB is being
1083      * migrated from node2 to node1. If there would be a graph loop,
1084      * break the order "load_stopped_node2" -> "rscA_migrate_to node1".
1085      */
1086     if ((input->type == pe_order_load) && action->rsc
1087         && pcmk__str_eq(action->task, RSC_MIGRATE, pcmk__str_casei)
1088         && pcmk__graph_has_loop(action, action, input)) {
1089         return true;
1090     }
1091 
1092     return false;
1093 }
1094 
1095 void
1096 pcmk__disable_invalid_orderings(pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1097 {
1098     for (GList *iter = data_set->actions; iter != NULL; iter = iter->next) {
1099         pe_action_t *action = (pe_action_t *) iter->data;
1100         pe_action_wrapper_t *input = NULL;
1101 
1102         for (GList *input_iter = action->actions_before;
1103              input_iter != NULL; input_iter = input_iter->next) {
1104 
1105             input = (pe_action_wrapper_t *) input_iter->data;
1106             if (ordering_is_invalid(action, input)) {
1107                 input->type = pe_order_none;
1108             }
1109         }
1110     }
1111 }
1112 
1113 /*!
1114  * \internal
1115  * \brief Order stops on a node before the node's shutdown
1116  *
1117  * \param[in,out] node         Node being shut down
1118  * \param[in]     shutdown_op  Shutdown action for node
1119  */
1120 void
1121 pcmk__order_stops_before_shutdown(pe_node_t *node, pe_action_t *shutdown_op)
     /* [previous][next][first][last][top][bottom][index][help] */
1122 {
1123     for (GList *iter = node->details->data_set->actions;
1124          iter != NULL; iter = iter->next) {
1125 
1126         pe_action_t *action = (pe_action_t *) iter->data;
1127 
1128         // Only stops on the node shutting down are relevant
1129         if ((action->rsc == NULL) || (action->node == NULL)
1130             || (action->node->details != node->details)
1131             || !pcmk__str_eq(action->task, RSC_STOP, pcmk__str_casei)) {
1132             continue;
1133         }
1134 
1135         // Resources and nodes in maintenance mode won't be touched
1136 
1137         if (pcmk_is_set(action->rsc->flags, pe_rsc_maintenance)) {
1138             pe_rsc_trace(action->rsc,
1139                          "Not ordering %s before shutdown of %s because "
1140                          "resource in maintenance mode",
1141                          action->uuid, pe__node_name(node));
1142             continue;
1143 
1144         } else if (node->details->maintenance) {
1145             pe_rsc_trace(action->rsc,
1146                          "Not ordering %s before shutdown of %s because "
1147                          "node in maintenance mode",
1148                          action->uuid, pe__node_name(node));
1149             continue;
1150         }
1151 
1152         /* Don't touch a resource that is unmanaged or blocked, to avoid
1153          * blocking the shutdown (though if another action depends on this one,
1154          * we may still end up blocking)
1155          */
1156         if (!pcmk_any_flags_set(action->rsc->flags,
1157                                 pe_rsc_managed|pe_rsc_block)) {
1158             pe_rsc_trace(action->rsc,
1159                          "Not ordering %s before shutdown of %s because "
1160                          "resource is unmanaged or blocked",
1161                          action->uuid, pe__node_name(node));
1162             continue;
1163         }
1164 
1165         pe_rsc_trace(action->rsc, "Ordering %s before shutdown of %s",
1166                      action->uuid, pe__node_name(node));
1167         pe__clear_action_flags(action, pe_action_optional);
1168         pcmk__new_ordering(action->rsc, NULL, action, NULL,
1169                            strdup(CRM_OP_SHUTDOWN), shutdown_op,
1170                            pe_order_optional|pe_order_runnable_left,
1171                            node->details->data_set);
1172     }
1173 }
1174 
1175 /*!
1176  * \brief Find resource actions matching directly or as child
1177  *
1178  * \param[in] rsc           Resource to check
1179  * \param[in] original_key  Action key to search for (possibly referencing
1180  *                          parent of \rsc)
1181  *
1182  * \return Newly allocated list of matching actions
1183  * \note It is the caller's responsibility to free the result with g_list_free()
1184  */
1185 static GList *
1186 find_actions_by_task(const pe_resource_t *rsc, const char *original_key)
     /* [previous][next][first][last][top][bottom][index][help] */
1187 {
1188     // Search under given task key directly
1189     GList *list = find_actions(rsc->actions, original_key, NULL);
1190 
1191     if (list == NULL) {
1192         // Search again using this resource's ID
1193         char *key = NULL;
1194         char *task = NULL;
1195         guint interval_ms = 0;
1196 
1197         if (parse_op_key(original_key, NULL, &task, &interval_ms)) {
1198             key = pcmk__op_key(rsc->id, task, interval_ms);
1199             list = find_actions(rsc->actions, key, NULL);
1200             free(key);
1201             free(task);
1202         } else {
1203             crm_err("Invalid operation key (bug?): %s", original_key);
1204         }
1205     }
1206     return list;
1207 }
1208 
1209 /*!
1210  * \internal
1211  * \brief Order relevant resource actions after a given action
1212  *
1213  * \param[in,out] first_action  Action to order after (or NULL if none runnable)
1214  * \param[in]     rsc           Resource whose actions should be ordered
1215  * \param[in,out] order         Ordering constraint being applied
1216  */
1217 static void
1218 order_resource_actions_after(pe_action_t *first_action,
     /* [previous][next][first][last][top][bottom][index][help] */
1219                              const pe_resource_t *rsc, pe__ordering_t *order)
1220 {
1221     GList *then_actions = NULL;
1222     uint32_t flags = pe_order_none;
1223 
1224     CRM_CHECK((rsc != NULL) && (order != NULL), return);
1225 
1226     flags = order->flags;
1227     pe_rsc_trace(rsc, "Applying ordering %d for 'then' resource %s",
1228                  order->id, rsc->id);
1229 
1230     if (order->rh_action != NULL) {
1231         then_actions = g_list_prepend(NULL, order->rh_action);
1232 
1233     } else {
1234         then_actions = find_actions_by_task(rsc, order->rh_action_task);
1235     }
1236 
1237     if (then_actions == NULL) {
1238         pe_rsc_trace(rsc, "Ignoring ordering %d: no %s actions found for %s",
1239                      order->id, order->rh_action_task, rsc->id);
1240         return;
1241     }
1242 
1243     if ((first_action != NULL) && (first_action->rsc == rsc)
1244         && pcmk_is_set(first_action->flags, pe_action_dangle)) {
1245 
1246         pe_rsc_trace(rsc,
1247                      "Detected dangling migration ordering (%s then %s %s)",
1248                      first_action->uuid, order->rh_action_task, rsc->id);
1249         pe__clear_order_flags(flags, pe_order_implies_then);
1250     }
1251 
1252     if ((first_action == NULL) && !pcmk_is_set(flags, pe_order_implies_then)) {
1253         pe_rsc_debug(rsc,
1254                      "Ignoring ordering %d for %s: No first action found",
1255                      order->id, rsc->id);
1256         g_list_free(then_actions);
1257         return;
1258     }
1259 
1260     for (GList *iter = then_actions; iter != NULL; iter = iter->next) {
1261         pe_action_t *then_action_iter = (pe_action_t *) iter->data;
1262 
1263         if (first_action != NULL) {
1264             order_actions(first_action, then_action_iter, flags);
1265         } else {
1266             pe__clear_action_flags(then_action_iter, pe_action_runnable);
1267             crm_warn("%s of %s is unrunnable because there is no %s of %s "
1268                      "to order it after", then_action_iter->task, rsc->id,
1269                      order->lh_action_task, order->lh_rsc->id);
1270         }
1271     }
1272 
1273     g_list_free(then_actions);
1274 }
1275 
1276 static void
1277 rsc_order_first(pe_resource_t *first_rsc, pe__ordering_t *order,
     /* [previous][next][first][last][top][bottom][index][help] */
1278                 pe_working_set_t *data_set)
1279 {
1280     GList *first_actions = NULL;
1281     pe_action_t *first_action = order->lh_action;
1282     pe_resource_t *then_rsc = order->rh_rsc;
1283 
1284     CRM_ASSERT(first_rsc != NULL);
1285     pe_rsc_trace(first_rsc, "Applying ordering constraint %d (first: %s)",
1286                  order->id, first_rsc->id);
1287 
1288     if (first_action != NULL) {
1289         first_actions = g_list_prepend(NULL, first_action);
1290 
1291     } else {
1292         first_actions = find_actions_by_task(first_rsc, order->lh_action_task);
1293     }
1294 
1295     if ((first_actions == NULL) && (first_rsc == then_rsc)) {
1296         pe_rsc_trace(first_rsc,
1297                      "Ignoring constraint %d: first (%s for %s) not found",
1298                      order->id, order->lh_action_task, first_rsc->id);
1299 
1300     } else if (first_actions == NULL) {
1301         char *key = NULL;
1302         char *op_type = NULL;
1303         guint interval_ms = 0;
1304 
1305         parse_op_key(order->lh_action_task, NULL, &op_type, &interval_ms);
1306         key = pcmk__op_key(first_rsc->id, op_type, interval_ms);
1307 
1308         if ((first_rsc->fns->state(first_rsc, TRUE) == RSC_ROLE_STOPPED)
1309             && pcmk__str_eq(op_type, RSC_STOP, pcmk__str_casei)) {
1310             free(key);
1311             pe_rsc_trace(first_rsc,
1312                          "Ignoring constraint %d: first (%s for %s) not found",
1313                          order->id, order->lh_action_task, first_rsc->id);
1314 
1315         } else if ((first_rsc->fns->state(first_rsc, TRUE) == RSC_ROLE_UNPROMOTED)
1316                    && pcmk__str_eq(op_type, RSC_DEMOTE, pcmk__str_casei)) {
1317             free(key);
1318             pe_rsc_trace(first_rsc,
1319                          "Ignoring constraint %d: first (%s for %s) not found",
1320                          order->id, order->lh_action_task, first_rsc->id);
1321 
1322         } else {
1323             pe_rsc_trace(first_rsc,
1324                          "Creating first (%s for %s) for constraint %d ",
1325                          order->lh_action_task, first_rsc->id, order->id);
1326             first_action = custom_action(first_rsc, key, op_type, NULL, TRUE,
1327                                          TRUE, data_set);
1328             first_actions = g_list_prepend(NULL, first_action);
1329         }
1330 
1331         free(op_type);
1332     }
1333 
1334     if (then_rsc == NULL) {
1335         if (order->rh_action == NULL) {
1336             pe_rsc_trace(first_rsc, "Ignoring constraint %d: then not found",
1337                          order->id);
1338             return;
1339         }
1340         then_rsc = order->rh_action->rsc;
1341     }
1342     for (GList *gIter = first_actions; gIter != NULL; gIter = gIter->next) {
1343         first_action = (pe_action_t *) gIter->data;
1344 
1345         if (then_rsc == NULL) {
1346             order_actions(first_action, order->rh_action, order->flags);
1347 
1348         } else {
1349             order_resource_actions_after(first_action, then_rsc, order);
1350         }
1351     }
1352 
1353     g_list_free(first_actions);
1354 }
1355 
1356 void
1357 pcmk__apply_orderings(pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1358 {
1359     crm_trace("Applying ordering constraints");
1360 
1361     /* Ordering constraints need to be processed in the order they were created.
1362      * rsc_order_first() and order_resource_actions_after() require the relevant
1363      * actions to already exist in some cases, but rsc_order_first() will create
1364      * the 'first' action in certain cases. Thus calling rsc_order_first() can
1365      * change the behavior of later-created orderings.
1366      *
1367      * Also, g_list_append() should be avoided for performance reasons, so we
1368      * prepend orderings when creating them and reverse the list here.
1369      *
1370      * @TODO This is brittle and should be carefully redesigned so that the
1371      * order of creation doesn't matter, and the reverse becomes unneeded.
1372      */
1373     data_set->ordering_constraints = g_list_reverse(data_set->ordering_constraints);
1374 
1375     for (GList *gIter = data_set->ordering_constraints;
1376          gIter != NULL; gIter = gIter->next) {
1377 
1378         pe__ordering_t *order = gIter->data;
1379         pe_resource_t *rsc = order->lh_rsc;
1380 
1381         if (rsc != NULL) {
1382             rsc_order_first(rsc, order, data_set);
1383             continue;
1384         }
1385 
1386         rsc = order->rh_rsc;
1387         if (rsc != NULL) {
1388             order_resource_actions_after(order->lh_action, rsc, order);
1389 
1390         } else {
1391             crm_trace("Applying ordering constraint %d (non-resource actions)",
1392                       order->id);
1393             order_actions(order->lh_action, order->rh_action, order->flags);
1394         }
1395     }
1396 
1397     g_list_foreach(data_set->actions, (GFunc) pcmk__block_colocation_dependents,
1398                    data_set);
1399 
1400     crm_trace("Ordering probes");
1401     pcmk__order_probes(data_set);
1402 
1403     crm_trace("Updating %d actions", g_list_length(data_set->actions));
1404     g_list_foreach(data_set->actions,
1405                    (GFunc) pcmk__update_action_for_orderings, data_set);
1406 
1407     pcmk__disable_invalid_orderings(data_set);
1408 }
1409 
1410 /*!
1411  * \internal
1412  * \brief Order a given action after each action in a given list
1413  *
1414  * \param[in,out] after  "After" action
1415  * \param[in,out] list   List of "before" actions
1416  */
1417 void
1418 pcmk__order_after_each(pe_action_t *after, GList *list)
     /* [previous][next][first][last][top][bottom][index][help] */
1419 {
1420     const char *after_desc = (after->task == NULL)? after->uuid : after->task;
1421 
1422     for (GList *iter = list; iter != NULL; iter = iter->next) {
1423         pe_action_t *before = (pe_action_t *) iter->data;
1424         const char *before_desc = before->task? before->task : before->uuid;
1425 
1426         crm_debug("Ordering %s on %s before %s on %s",
1427                   before_desc, pe__node_name(before->node),
1428                   after_desc, pe__node_name(after->node));
1429         order_actions(before, after, pe_order_optional);
1430     }
1431 }
1432 
1433 /*!
1434  * \internal
1435  * \brief Order promotions and demotions for restarts of a clone or bundle
1436  *
1437  * \param[in,out] rsc  Clone or bundle to order
1438  */
1439 void
1440 pcmk__promotable_restart_ordering(pe_resource_t *rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
1441 {
1442     // Order start and promote after all instances are stopped
1443     pcmk__order_resource_actions(rsc, RSC_STOPPED, rsc, RSC_START,
1444                                  pe_order_optional);
1445     pcmk__order_resource_actions(rsc, RSC_STOPPED, rsc, RSC_PROMOTE,
1446                                  pe_order_optional);
1447 
1448     // Order stop, start, and promote after all instances are demoted
1449     pcmk__order_resource_actions(rsc, RSC_DEMOTED, rsc, RSC_STOP,
1450                                  pe_order_optional);
1451     pcmk__order_resource_actions(rsc, RSC_DEMOTED, rsc, RSC_START,
1452                                  pe_order_optional);
1453     pcmk__order_resource_actions(rsc, RSC_DEMOTED, rsc, RSC_PROMOTE,
1454                                  pe_order_optional);
1455 
1456     // Order promote after all instances are started
1457     pcmk__order_resource_actions(rsc, RSC_STARTED, rsc, RSC_PROMOTE,
1458                                  pe_order_optional);
1459 
1460     // Order demote after all instances are demoted
1461     pcmk__order_resource_actions(rsc, RSC_DEMOTE, rsc, RSC_DEMOTED,
1462                                  pe_order_optional);
1463 }

/* [previous][next][first][last][top][bottom][index][help] */