root/lib/pacemaker/pcmk_sched_clone.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. sort_rsc_id
  2. parent_node_instance
  3. did_fail
  4. sort_clone_instance
  5. can_run_instance
  6. allocate_instance
  7. append_parent_colocation
  8. distribute_children
  9. pcmk__clone_allocate
  10. clone_update_pseudo_status
  11. find_rsc_action
  12. child_ordering_constraints
  13. clone_create_actions
  14. clone_create_pseudo_actions
  15. clone_internal_constraints
  16. assign_node
  17. is_child_compatible
  18. find_compatible_child
  19. clone_rsc_colocation_lh
  20. clone_rsc_colocation_rh
  21. clone_child_action
  22. summary_action_flags
  23. clone_action_flags
  24. clone_rsc_location
  25. clone_expand
  26. rsc_known_on
  27. find_instance_on
  28. probe_unique_clone
  29. probe_anonymous_clone
  30. clone_create_probe
  31. clone_append_meta

   1 /*
   2  * Copyright 2004-2020 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU General Public License version 2
   7  * or later (GPLv2+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <crm/msg_xml.h>
  13 #include <pacemaker-internal.h>
  14 
  15 #define VARIANT_CLONE 1
  16 #include <lib/pengine/variant.h>
  17 
  18 gint sort_clone_instance(gconstpointer a, gconstpointer b, gpointer data_set);
  19 static void append_parent_colocation(pe_resource_t * rsc, pe_resource_t * child, gboolean all);
  20 
  21 static gint
  22 sort_rsc_id(gconstpointer a, gconstpointer b)
     /* [previous][next][first][last][top][bottom][index][help] */
  23 {
  24     const pe_resource_t *resource1 = (const pe_resource_t *)a;
  25     const pe_resource_t *resource2 = (const pe_resource_t *)b;
  26     long num1, num2;
  27 
  28     CRM_ASSERT(resource1 != NULL);
  29     CRM_ASSERT(resource2 != NULL);
  30 
  31     /*
  32      * Sort clone instances numerically by instance number, so instance :10
  33      * comes after :9.
  34      */
  35     num1 = strtol(strrchr(resource1->id, ':') + 1, NULL, 10);
  36     num2 = strtol(strrchr(resource2->id, ':') + 1, NULL, 10);
  37     if (num1 < num2) {
  38         return -1;
  39     } else if (num1 > num2) {
  40         return 1;
  41     }
  42     return 0;
  43 }
  44 
  45 static pe_node_t *
  46 parent_node_instance(const pe_resource_t * rsc, pe_node_t * node)
     /* [previous][next][first][last][top][bottom][index][help] */
  47 {
  48     pe_node_t *ret = NULL;
  49 
  50     if (node != NULL && rsc->parent) {
  51         ret = pe_hash_table_lookup(rsc->parent->allowed_nodes, node->details->id);
  52     } else if(node != NULL) {
  53         ret = pe_hash_table_lookup(rsc->allowed_nodes, node->details->id);
  54     }
  55     return ret;
  56 }
  57 
  58 static gboolean
  59 did_fail(const pe_resource_t * rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
  60 {
  61     GListPtr gIter = rsc->children;
  62 
  63     if (pcmk_is_set(rsc->flags, pe_rsc_failed)) {
  64         return TRUE;
  65     }
  66 
  67     for (; gIter != NULL; gIter = gIter->next) {
  68         pe_resource_t *child_rsc = (pe_resource_t *) gIter->data;
  69 
  70         if (did_fail(child_rsc)) {
  71             return TRUE;
  72         }
  73     }
  74     return FALSE;
  75 }
  76 
  77 gint
  78 sort_clone_instance(gconstpointer a, gconstpointer b, gpointer data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
  79 {
  80     int rc = 0;
  81     pe_node_t *node1 = NULL;
  82     pe_node_t *node2 = NULL;
  83     pe_node_t *current_node1 = NULL;
  84     pe_node_t *current_node2 = NULL;
  85     unsigned int nnodes1 = 0;
  86     unsigned int nnodes2 = 0;
  87 
  88     gboolean can1 = TRUE;
  89     gboolean can2 = TRUE;
  90 
  91     const pe_resource_t *resource1 = (const pe_resource_t *)a;
  92     const pe_resource_t *resource2 = (const pe_resource_t *)b;
  93 
  94     CRM_ASSERT(resource1 != NULL);
  95     CRM_ASSERT(resource2 != NULL);
  96 
  97     /* allocation order:
  98      *  - active instances
  99      *  - instances running on nodes with the least copies
 100      *  - active instances on nodes that can't support them or are to be fenced
 101      *  - failed instances
 102      *  - inactive instances
 103      */
 104 
 105     current_node1 = pe__find_active_on(resource1, &nnodes1, NULL);
 106     current_node2 = pe__find_active_on(resource2, &nnodes2, NULL);
 107 
 108     if (nnodes1 && nnodes2) {
 109         if (nnodes1 < nnodes2) {
 110             crm_trace("%s < %s: running_on", resource1->id, resource2->id);
 111             return -1;
 112 
 113         } else if (nnodes1 > nnodes2) {
 114             crm_trace("%s > %s: running_on", resource1->id, resource2->id);
 115             return 1;
 116         }
 117     }
 118 
 119     node1 = current_node1;
 120     node2 = current_node2;
 121     if (node1) {
 122         pe_node_t *match = pe_hash_table_lookup(resource1->allowed_nodes, node1->details->id);
 123 
 124         if (match == NULL || match->weight < 0) {
 125             crm_trace("%s: current location is unavailable", resource1->id);
 126             node1 = NULL;
 127             can1 = FALSE;
 128         }
 129     }
 130 
 131     if (node2) {
 132         pe_node_t *match = pe_hash_table_lookup(resource2->allowed_nodes, node2->details->id);
 133 
 134         if (match == NULL || match->weight < 0) {
 135             crm_trace("%s: current location is unavailable", resource2->id);
 136             node2 = NULL;
 137             can2 = FALSE;
 138         }
 139     }
 140 
 141     if (can1 != can2) {
 142         if (can1) {
 143             crm_trace("%s < %s: availability of current location", resource1->id, resource2->id);
 144             return -1;
 145         }
 146         crm_trace("%s > %s: availability of current location", resource1->id, resource2->id);
 147         return 1;
 148     }
 149 
 150     if (resource1->priority < resource2->priority) {
 151         crm_trace("%s < %s: priority", resource1->id, resource2->id);
 152         return 1;
 153 
 154     } else if (resource1->priority > resource2->priority) {
 155         crm_trace("%s > %s: priority", resource1->id, resource2->id);
 156         return -1;
 157     }
 158 
 159     if (node1 == NULL && node2 == NULL) {
 160         crm_trace("%s == %s: not active", resource1->id, resource2->id);
 161         return 0;
 162     }
 163 
 164     if (node1 != node2) {
 165         if (node1 == NULL) {
 166             crm_trace("%s > %s: active", resource1->id, resource2->id);
 167             return 1;
 168         } else if (node2 == NULL) {
 169             crm_trace("%s < %s: active", resource1->id, resource2->id);
 170             return -1;
 171         }
 172     }
 173 
 174     can1 = can_run_resources(node1);
 175     can2 = can_run_resources(node2);
 176     if (can1 != can2) {
 177         if (can1) {
 178             crm_trace("%s < %s: can", resource1->id, resource2->id);
 179             return -1;
 180         }
 181         crm_trace("%s > %s: can", resource1->id, resource2->id);
 182         return 1;
 183     }
 184 
 185     node1 = parent_node_instance(resource1, node1);
 186     node2 = parent_node_instance(resource2, node2);
 187     if (node1 != NULL && node2 == NULL) {
 188         crm_trace("%s < %s: not allowed", resource1->id, resource2->id);
 189         return -1;
 190     } else if (node1 == NULL && node2 != NULL) {
 191         crm_trace("%s > %s: not allowed", resource1->id, resource2->id);
 192         return 1;
 193     }
 194 
 195     if (node1 == NULL || node2 == NULL) {
 196         crm_trace("%s == %s: not allowed", resource1->id, resource2->id);
 197         return 0;
 198     }
 199 
 200     if (node1->count < node2->count) {
 201         crm_trace("%s < %s: count", resource1->id, resource2->id);
 202         return -1;
 203 
 204     } else if (node1->count > node2->count) {
 205         crm_trace("%s > %s: count", resource1->id, resource2->id);
 206         return 1;
 207     }
 208 
 209     can1 = did_fail(resource1);
 210     can2 = did_fail(resource2);
 211     if (can1 != can2) {
 212         if (can1) {
 213             crm_trace("%s > %s: failed", resource1->id, resource2->id);
 214             return 1;
 215         }
 216         crm_trace("%s < %s: failed", resource1->id, resource2->id);
 217         return -1;
 218     }
 219 
 220     if (node1 && node2) {
 221         int lpc = 0;
 222         int max = 0;
 223         pe_node_t *n = NULL;
 224         GListPtr gIter = NULL;
 225         GListPtr list1 = NULL;
 226         GListPtr list2 = NULL;
 227         GHashTable *hash1 =
 228             g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free);
 229         GHashTable *hash2 =
 230             g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free);
 231 
 232         n = pe__copy_node(current_node1);
 233         g_hash_table_insert(hash1, (gpointer) n->details->id, n);
 234 
 235         n = pe__copy_node(current_node2);
 236         g_hash_table_insert(hash2, (gpointer) n->details->id, n);
 237 
 238         if(resource1->parent) {
 239             for (gIter = resource1->parent->rsc_cons; gIter; gIter = gIter->next) {
 240                 rsc_colocation_t *constraint = (rsc_colocation_t *) gIter->data;
 241 
 242                 if (constraint->score == 0) {
 243                     continue;
 244                 }
 245                 crm_trace("Applying %s to %s", constraint->id, resource1->id);
 246 
 247                 hash1 = pcmk__native_merge_weights(constraint->rsc_rh,
 248                                                    resource1->id, hash1,
 249                                                    constraint->node_attribute,
 250                                                    constraint->score / (float) INFINITY,
 251                                                    0);
 252             }
 253 
 254             for (gIter = resource1->parent->rsc_cons_lhs; gIter; gIter = gIter->next) {
 255                 rsc_colocation_t *constraint = (rsc_colocation_t *) gIter->data;
 256 
 257                 if (constraint->score == 0) {
 258                     continue;
 259                 }
 260                 crm_trace("Applying %s to %s", constraint->id, resource1->id);
 261 
 262                 hash1 = pcmk__native_merge_weights(constraint->rsc_lh,
 263                                                    resource1->id, hash1,
 264                                                    constraint->node_attribute,
 265                                                    constraint->score / (float) INFINITY,
 266                                                    pe_weights_positive);
 267             }
 268         }
 269 
 270         if(resource2->parent) {
 271             for (gIter = resource2->parent->rsc_cons; gIter; gIter = gIter->next) {
 272                 rsc_colocation_t *constraint = (rsc_colocation_t *) gIter->data;
 273 
 274                 crm_trace("Applying %s to %s", constraint->id, resource2->id);
 275 
 276                 hash2 = pcmk__native_merge_weights(constraint->rsc_rh,
 277                                                    resource2->id, hash2,
 278                                                    constraint->node_attribute,
 279                                                    constraint->score / (float) INFINITY,
 280                                                    0);
 281             }
 282 
 283             for (gIter = resource2->parent->rsc_cons_lhs; gIter; gIter = gIter->next) {
 284                 rsc_colocation_t *constraint = (rsc_colocation_t *) gIter->data;
 285 
 286                 crm_trace("Applying %s to %s", constraint->id, resource2->id);
 287 
 288                 hash2 = pcmk__native_merge_weights(constraint->rsc_lh,
 289                                                    resource2->id, hash2,
 290                                                    constraint->node_attribute,
 291                                                    constraint->score / (float) INFINITY,
 292                                                    pe_weights_positive);
 293             }
 294         }
 295 
 296         /* Current location score */
 297         node1 = g_hash_table_lookup(hash1, current_node1->details->id);
 298         node2 = g_hash_table_lookup(hash2, current_node2->details->id);
 299 
 300         if (node1->weight < node2->weight) {
 301             if (node1->weight < 0) {
 302                 crm_trace("%s > %s: current score: %d %d", resource1->id, resource2->id, node1->weight, node2->weight);
 303                 rc = -1;
 304                 goto out;
 305 
 306             } else {
 307                 crm_trace("%s < %s: current score: %d %d", resource1->id, resource2->id, node1->weight, node2->weight);
 308                 rc = 1;
 309                 goto out;
 310             }
 311 
 312         } else if (node1->weight > node2->weight) {
 313             crm_trace("%s > %s: current score: %d %d", resource1->id, resource2->id, node1->weight, node2->weight);
 314             rc = -1;
 315             goto out;
 316         }
 317 
 318         /* All location scores */
 319         list1 = g_hash_table_get_values(hash1);
 320         list2 = g_hash_table_get_values(hash2);
 321 
 322         list1 = sort_nodes_by_weight(list1, current_node1, data_set);
 323         list2 = sort_nodes_by_weight(list2, current_node2, data_set);
 324         max = g_list_length(list1);
 325         if (max < g_list_length(list2)) {
 326             max = g_list_length(list2);
 327         }
 328 
 329         for (; lpc < max; lpc++) {
 330             node1 = g_list_nth_data(list1, lpc);
 331             node2 = g_list_nth_data(list2, lpc);
 332             if (node1 == NULL) {
 333                 crm_trace("%s < %s: colocated score NULL", resource1->id, resource2->id);
 334                 rc = 1;
 335                 break;
 336 
 337             } else if (node2 == NULL) {
 338                 crm_trace("%s > %s: colocated score NULL", resource1->id, resource2->id);
 339                 rc = -1;
 340                 break;
 341             }
 342 
 343             if (node1->weight < node2->weight) {
 344                 crm_trace("%s < %s: colocated score", resource1->id, resource2->id);
 345                 rc = 1;
 346                 break;
 347 
 348             } else if (node1->weight > node2->weight) {
 349                 crm_trace("%s > %s: colocated score", resource1->id, resource2->id);
 350                 rc = -1;
 351                 break;
 352             }
 353         }
 354 
 355         /* Order by reverse uname - same as sort_node_weight() does? */
 356   out:
 357         g_hash_table_destroy(hash1);    /* Free mem */
 358         g_hash_table_destroy(hash2);    /* Free mem */
 359         g_list_free(list1);
 360         g_list_free(list2);
 361 
 362         if (rc != 0) {
 363             return rc;
 364         }
 365     }
 366 
 367     rc = strcmp(resource1->id, resource2->id);
 368     crm_trace("%s %c %s: default", resource1->id, rc < 0 ? '<' : '>', resource2->id);
 369     return rc;
 370 }
 371 
 372 static pe_node_t *
 373 can_run_instance(pe_resource_t * rsc, pe_node_t * node, int limit)
     /* [previous][next][first][last][top][bottom][index][help] */
 374 {
 375     pe_node_t *local_node = NULL;
 376 
 377     if (node == NULL && rsc->allowed_nodes) {
 378         GHashTableIter iter;
 379         g_hash_table_iter_init(&iter, rsc->allowed_nodes);
 380         while (g_hash_table_iter_next(&iter, NULL, (void **)&local_node)) {
 381             can_run_instance(rsc, local_node, limit);
 382         }
 383         return NULL;
 384     }
 385 
 386     if (!node) {
 387         /* make clang analyzer happy */
 388         goto bail;
 389 
 390     } else if (can_run_resources(node) == FALSE) {
 391         goto bail;
 392 
 393     } else if (pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
 394         goto bail;
 395     }
 396 
 397     local_node = parent_node_instance(rsc, node);
 398 
 399     if (local_node == NULL) {
 400         crm_warn("%s cannot run on %s: node not allowed", rsc->id, node->details->uname);
 401         goto bail;
 402 
 403     } else if (local_node->weight < 0) {
 404         common_update_score(rsc, node->details->id, local_node->weight);
 405         pe_rsc_trace(rsc, "%s cannot run on %s: Parent node weight doesn't allow it.",
 406                      rsc->id, node->details->uname);
 407 
 408     } else if (local_node->count < limit) {
 409         pe_rsc_trace(rsc, "%s can run on %s (already running %d)",
 410                      rsc->id, node->details->uname, local_node->count);
 411         return local_node;
 412 
 413     } else {
 414         pe_rsc_trace(rsc, "%s cannot run on %s: node full (%d >= %d)",
 415                      rsc->id, node->details->uname, local_node->count, limit);
 416     }
 417 
 418   bail:
 419     if (node) {
 420         common_update_score(rsc, node->details->id, -INFINITY);
 421     }
 422     return NULL;
 423 }
 424 
 425 static pe_node_t *
 426 allocate_instance(pe_resource_t *rsc, pe_node_t *prefer, gboolean all_coloc,
     /* [previous][next][first][last][top][bottom][index][help] */
 427                   int limit, pe_working_set_t *data_set)
 428 {
 429     pe_node_t *chosen = NULL;
 430     GHashTable *backup = NULL;
 431 
 432     CRM_ASSERT(rsc);
 433     pe_rsc_trace(rsc, "Checking allocation of %s (preferring %s, using %s parent colocations)",
 434                  rsc->id, (prefer? prefer->details->uname: "none"),
 435                  (all_coloc? "all" : "some"));
 436 
 437     if (!pcmk_is_set(rsc->flags, pe_rsc_provisional)) {
 438         return rsc->fns->location(rsc, NULL, FALSE);
 439 
 440     } else if (pcmk_is_set(rsc->flags, pe_rsc_allocating)) {
 441         pe_rsc_debug(rsc, "Dependency loop detected involving %s", rsc->id);
 442         return NULL;
 443     }
 444 
 445     /* Only include positive colocation preferences of dependent resources
 446      * if not every node will get a copy of the clone
 447      */
 448     append_parent_colocation(rsc->parent, rsc, all_coloc);
 449 
 450     if (prefer) {
 451         pe_node_t *local_prefer = g_hash_table_lookup(rsc->allowed_nodes, prefer->details->id);
 452 
 453         if (local_prefer == NULL || local_prefer->weight < 0) {
 454             pe_rsc_trace(rsc, "Not pre-allocating %s to %s - unavailable", rsc->id,
 455                          prefer->details->uname);
 456             return NULL;
 457         }
 458     }
 459 
 460     can_run_instance(rsc, NULL, limit);
 461 
 462     backup = pcmk__copy_node_table(rsc->allowed_nodes);
 463     pe_rsc_trace(rsc, "Allocating instance %s", rsc->id);
 464     chosen = rsc->cmds->allocate(rsc, prefer, data_set);
 465     if (chosen && prefer && (chosen->details != prefer->details)) {
 466         crm_info("Not pre-allocating %s to %s because %s is better",
 467                  rsc->id, prefer->details->uname, chosen->details->uname);
 468         g_hash_table_destroy(rsc->allowed_nodes);
 469         rsc->allowed_nodes = backup;
 470         native_deallocate(rsc);
 471         chosen = NULL;
 472         backup = NULL;
 473     }
 474     if (chosen) {
 475         pe_node_t *local_node = parent_node_instance(rsc, chosen);
 476 
 477         if (local_node) {
 478             local_node->count++;
 479 
 480         } else if (pcmk_is_set(rsc->flags, pe_rsc_managed)) {
 481             /* what to do? we can't enforce per-node limits in this case */
 482             pcmk__config_err("%s not found in %s (list of %d)",
 483                              chosen->details->id, rsc->parent->id,
 484                              g_hash_table_size(rsc->parent->allowed_nodes));
 485         }
 486     }
 487 
 488     if(backup) {
 489         g_hash_table_destroy(backup);
 490     }
 491     return chosen;
 492 }
 493 
 494 static void
 495 append_parent_colocation(pe_resource_t * rsc, pe_resource_t * child, gboolean all)
     /* [previous][next][first][last][top][bottom][index][help] */
 496 {
 497 
 498     GListPtr gIter = NULL;
 499 
 500     gIter = rsc->rsc_cons;
 501     for (; gIter != NULL; gIter = gIter->next) {
 502         rsc_colocation_t *cons = (rsc_colocation_t *) gIter->data;
 503 
 504         if (cons->score == 0) {
 505             continue;
 506         }
 507         if (all || cons->score < 0 || cons->score == INFINITY) {
 508             child->rsc_cons = g_list_prepend(child->rsc_cons, cons);
 509         }
 510     }
 511 
 512     gIter = rsc->rsc_cons_lhs;
 513     for (; gIter != NULL; gIter = gIter->next) {
 514         rsc_colocation_t *cons = (rsc_colocation_t *) gIter->data;
 515 
 516         if (cons->score == 0) {
 517             continue;
 518         }
 519         if (all || cons->score < 0) {
 520             child->rsc_cons_lhs = g_list_prepend(child->rsc_cons_lhs, cons);
 521         }
 522     }
 523 }
 524 
 525 
 526 void
 527 distribute_children(pe_resource_t *rsc, GListPtr children, GListPtr nodes,
 528                     int max, int per_host_max, pe_working_set_t * data_set);
 529 
 530 void
 531 distribute_children(pe_resource_t *rsc, GListPtr children, GListPtr nodes,
     /* [previous][next][first][last][top][bottom][index][help] */
 532                     int max, int per_host_max, pe_working_set_t * data_set) 
 533 {
 534     int loop_max = 0;
 535     int allocated = 0;
 536     int available_nodes = 0;
 537 
 538     /* count now tracks the number of clones currently allocated */
 539     for(GListPtr nIter = nodes; nIter != NULL; nIter = nIter->next) {
 540         pe_node_t *node = nIter->data;
 541 
 542         node->count = 0;
 543         if (can_run_resources(node)) {
 544             available_nodes++;
 545         }
 546     }
 547 
 548     if(available_nodes) {
 549         loop_max = max / available_nodes;
 550     }
 551     if (loop_max < 1) {
 552         loop_max = 1;
 553     }
 554 
 555     pe_rsc_debug(rsc, "Allocating up to %d %s instances to a possible %d nodes (at most %d per host, %d optimal)",
 556                  max, rsc->id, available_nodes, per_host_max, loop_max);
 557 
 558     /* Pre-allocate as many instances as we can to their current location */
 559     for (GListPtr gIter = children; gIter != NULL && allocated < max; gIter = gIter->next) {
 560         pe_resource_t *child = (pe_resource_t *) gIter->data;
 561 
 562         if (child->running_on && pcmk_is_set(child->flags, pe_rsc_provisional)
 563             && !pcmk_is_set(child->flags, pe_rsc_failed)) {
 564             pe_node_t *child_node = pe__current_node(child);
 565             pe_node_t *local_node = parent_node_instance(child, child_node);
 566 
 567             pe_rsc_trace(rsc, "Checking pre-allocation of %s to %s (%d remaining of %d)",
 568                          child->id, child_node->details->uname, max - allocated, max);
 569 
 570             if (can_run_resources(child_node) == FALSE || child_node->weight < 0) {
 571                 pe_rsc_trace(rsc, "Not pre-allocating because %s can not run %s",
 572                              child_node->details->uname, child->id);
 573 
 574             } else if(local_node && local_node->count >= loop_max) {
 575                 pe_rsc_trace(rsc,
 576                              "Not pre-allocating because %s already allocated optimal instances",
 577                              child_node->details->uname);
 578 
 579             } else if (allocate_instance(child, child_node,
 580                                          max < available_nodes, per_host_max,
 581                                          data_set)) {
 582                 pe_rsc_trace(rsc, "Pre-allocated %s to %s", child->id,
 583                              child_node->details->uname);
 584                 allocated++;
 585             }
 586         }
 587     }
 588 
 589     pe_rsc_trace(rsc, "Done pre-allocating (%d of %d)", allocated, max);
 590 
 591     for (GListPtr gIter = children; gIter != NULL; gIter = gIter->next) {
 592         pe_resource_t *child = (pe_resource_t *) gIter->data;
 593 
 594         if (child->running_on != NULL) {
 595             pe_node_t *child_node = pe__current_node(child);
 596             pe_node_t *local_node = parent_node_instance(child, child_node);
 597 
 598             if (local_node == NULL) {
 599                 crm_err("%s is running on %s which isn't allowed",
 600                         child->id, child_node->details->uname);
 601             }
 602         }
 603 
 604         if (!pcmk_is_set(child->flags, pe_rsc_provisional)) {
 605         } else if (allocated >= max) {
 606             pe_rsc_debug(rsc, "Child %s not allocated - limit reached %d %d", child->id, allocated, max);
 607             resource_location(child, NULL, -INFINITY, "clone:limit_reached", data_set);
 608         } else {
 609             if (allocate_instance(child, NULL, max < available_nodes,
 610                                   per_host_max, data_set)) {
 611                 allocated++;
 612             }
 613         }
 614     }
 615 
 616     pe_rsc_debug(rsc, "Allocated %d %s instances of a possible %d",
 617                  allocated, rsc->id, max);
 618 }
 619 
 620 
 621 pe_node_t *
 622 pcmk__clone_allocate(pe_resource_t *rsc, pe_node_t *prefer,
     /* [previous][next][first][last][top][bottom][index][help] */
 623                      pe_working_set_t *data_set)
 624 {
 625     GListPtr nodes = NULL;
 626     clone_variant_data_t *clone_data = NULL;
 627 
 628     get_clone_variant_data(clone_data, rsc);
 629 
 630     if (!pcmk_is_set(rsc->flags, pe_rsc_provisional)) {
 631         return NULL;
 632 
 633     } else if (pcmk_is_set(rsc->flags, pe_rsc_allocating)) {
 634         pe_rsc_debug(rsc, "Dependency loop detected involving %s", rsc->id);
 635         return NULL;
 636     }
 637 
 638     if (pcmk_is_set(rsc->flags, pe_rsc_promotable)) {
 639         apply_master_prefs(rsc);
 640     }
 641 
 642     pe__set_resource_flags(rsc, pe_rsc_allocating);
 643 
 644     /* this information is used by sort_clone_instance() when deciding in which 
 645      * order to allocate clone instances
 646      */
 647     for (GListPtr gIter = rsc->rsc_cons; gIter != NULL; gIter = gIter->next) {
 648         rsc_colocation_t *constraint = (rsc_colocation_t *) gIter->data;
 649 
 650         if (constraint->score == 0) {
 651             continue;
 652         }
 653         pe_rsc_trace(rsc, "%s: Allocating %s first",
 654                      rsc->id, constraint->rsc_rh->id);
 655         constraint->rsc_rh->cmds->allocate(constraint->rsc_rh, prefer, data_set);
 656     }
 657 
 658     for (GListPtr gIter = rsc->rsc_cons_lhs; gIter != NULL; gIter = gIter->next) {
 659         rsc_colocation_t *constraint = (rsc_colocation_t *) gIter->data;
 660 
 661         if (constraint->score == 0) {
 662             continue;
 663         }
 664         rsc->allowed_nodes =
 665             constraint->rsc_lh->cmds->merge_weights(constraint->rsc_lh, rsc->id, rsc->allowed_nodes,
 666                                                     constraint->node_attribute,
 667                                                     (float)constraint->score / INFINITY,
 668                                                     (pe_weights_rollback | pe_weights_positive));
 669     }
 670 
 671     pe__show_node_weights(!show_scores, rsc, __func__, rsc->allowed_nodes);
 672 
 673     nodes = g_hash_table_get_values(rsc->allowed_nodes);
 674     nodes = sort_nodes_by_weight(nodes, NULL, data_set);
 675     rsc->children = g_list_sort_with_data(rsc->children, sort_clone_instance, data_set);
 676     distribute_children(rsc, rsc->children, nodes, clone_data->clone_max, clone_data->clone_node_max, data_set);
 677     g_list_free(nodes);
 678 
 679     if (pcmk_is_set(rsc->flags, pe_rsc_promotable)) {
 680         pcmk__set_instance_roles(rsc, data_set);
 681     }
 682 
 683     pe__clear_resource_flags(rsc, pe_rsc_provisional|pe_rsc_allocating);
 684     pe_rsc_trace(rsc, "Done allocating %s", rsc->id);
 685     return NULL;
 686 }
 687 
 688 static void
 689 clone_update_pseudo_status(pe_resource_t * rsc, gboolean * stopping, gboolean * starting,
     /* [previous][next][first][last][top][bottom][index][help] */
 690                            gboolean * active)
 691 {
 692     GListPtr gIter = NULL;
 693 
 694     if (rsc->children) {
 695 
 696         gIter = rsc->children;
 697         for (; gIter != NULL; gIter = gIter->next) {
 698             pe_resource_t *child = (pe_resource_t *) gIter->data;
 699 
 700             clone_update_pseudo_status(child, stopping, starting, active);
 701         }
 702 
 703         return;
 704     }
 705 
 706     CRM_ASSERT(active != NULL);
 707     CRM_ASSERT(starting != NULL);
 708     CRM_ASSERT(stopping != NULL);
 709 
 710     if (rsc->running_on) {
 711         *active = TRUE;
 712     }
 713 
 714     gIter = rsc->actions;
 715     for (; gIter != NULL; gIter = gIter->next) {
 716         pe_action_t *action = (pe_action_t *) gIter->data;
 717 
 718         if (*starting && *stopping) {
 719             return;
 720 
 721         } else if (pcmk_is_set(action->flags, pe_action_optional)) {
 722             pe_rsc_trace(rsc, "Skipping optional: %s", action->uuid);
 723             continue;
 724 
 725         } else if (!pcmk_any_flags_set(action->flags,
 726                                        pe_action_pseudo|pe_action_runnable)) {
 727             pe_rsc_trace(rsc, "Skipping unrunnable: %s", action->uuid);
 728             continue;
 729 
 730         } else if (pcmk__str_eq(RSC_STOP, action->task, pcmk__str_casei)) {
 731             pe_rsc_trace(rsc, "Stopping due to: %s", action->uuid);
 732             *stopping = TRUE;
 733 
 734         } else if (pcmk__str_eq(RSC_START, action->task, pcmk__str_casei)) {
 735             if (!pcmk_is_set(action->flags, pe_action_runnable)) {
 736                 pe_rsc_trace(rsc, "Skipping pseudo-op: %s run=%d, pseudo=%d",
 737                              action->uuid,
 738                              pcmk_is_set(action->flags, pe_action_runnable),
 739                              pcmk_is_set(action->flags, pe_action_pseudo));
 740             } else {
 741                 pe_rsc_trace(rsc, "Starting due to: %s", action->uuid);
 742                 pe_rsc_trace(rsc, "%s run=%d, pseudo=%d",
 743                              action->uuid,
 744                              pcmk_is_set(action->flags, pe_action_runnable),
 745                              pcmk_is_set(action->flags, pe_action_pseudo));
 746                 *starting = TRUE;
 747             }
 748         }
 749     }
 750 }
 751 
 752 static pe_action_t *
 753 find_rsc_action(pe_resource_t *rsc, const char *task, gboolean active_only,
     /* [previous][next][first][last][top][bottom][index][help] */
 754                 GList **list)
 755 {
 756     pe_action_t *match = NULL;
 757     GListPtr possible = NULL;
 758     GListPtr active = NULL;
 759 
 760     possible = pe__resource_actions(rsc, NULL, task, FALSE);
 761 
 762     if (active_only) {
 763         GListPtr gIter = possible;
 764 
 765         for (; gIter != NULL; gIter = gIter->next) {
 766             pe_action_t *op = (pe_action_t *) gIter->data;
 767 
 768             if (!pcmk_is_set(op->flags, pe_action_optional)) {
 769                 active = g_list_prepend(active, op);
 770             }
 771         }
 772 
 773         if (active && pcmk__list_of_1(active)) {
 774             match = g_list_nth_data(active, 0);
 775         }
 776 
 777         if (list) {
 778             *list = active;
 779             active = NULL;
 780         }
 781 
 782     } else if (possible && pcmk__list_of_1(possible)) {
 783         match = g_list_nth_data(possible, 0);
 784 
 785     }
 786     if (list) {
 787         *list = possible;
 788         possible = NULL;
 789     }
 790 
 791     if (possible) {
 792         g_list_free(possible);
 793     }
 794     if (active) {
 795         g_list_free(active);
 796     }
 797 
 798     return match;
 799 }
 800 
 801 static void
 802 child_ordering_constraints(pe_resource_t * rsc, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 803 {
 804     pe_action_t *stop = NULL;
 805     pe_action_t *start = NULL;
 806     pe_action_t *last_stop = NULL;
 807     pe_action_t *last_start = NULL;
 808     GListPtr gIter = NULL;
 809     gboolean active_only = TRUE;        /* change to false to get the old behavior */
 810     clone_variant_data_t *clone_data = NULL;
 811 
 812     get_clone_variant_data(clone_data, rsc);
 813 
 814     if (clone_data->ordered == FALSE) {
 815         return;
 816     }
 817     /* we have to maintain a consistent sorted child list when building order constraints */
 818     rsc->children = g_list_sort(rsc->children, sort_rsc_id);
 819 
 820     for (gIter = rsc->children; gIter != NULL; gIter = gIter->next) {
 821         pe_resource_t *child = (pe_resource_t *) gIter->data;
 822 
 823         stop = find_rsc_action(child, RSC_STOP, active_only, NULL);
 824         if (stop) {
 825             if (last_stop) {
 826                 /* child/child relative stop */
 827                 order_actions(stop, last_stop, pe_order_optional);
 828             }
 829             last_stop = stop;
 830         }
 831 
 832         start = find_rsc_action(child, RSC_START, active_only, NULL);
 833         if (start) {
 834             if (last_start) {
 835                 /* child/child relative start */
 836                 order_actions(last_start, start, pe_order_optional);
 837             }
 838             last_start = start;
 839         }
 840     }
 841 }
 842 
 843 void
 844 clone_create_actions(pe_resource_t *rsc, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 845 {
 846     clone_variant_data_t *clone_data = NULL;
 847 
 848     get_clone_variant_data(clone_data, rsc);
 849     clone_create_pseudo_actions(rsc, rsc->children, &clone_data->start_notify, &clone_data->stop_notify,data_set);
 850     child_ordering_constraints(rsc, data_set);
 851     if (pcmk_is_set(rsc->flags, pe_rsc_promotable)) {
 852         create_promotable_actions(rsc, data_set);
 853     }
 854 }
 855 
 856 void
 857 clone_create_pseudo_actions(
     /* [previous][next][first][last][top][bottom][index][help] */
 858     pe_resource_t * rsc, GListPtr children, notify_data_t **start_notify, notify_data_t **stop_notify,  pe_working_set_t * data_set)
 859 {
 860     gboolean child_active = FALSE;
 861     gboolean child_starting = FALSE;
 862     gboolean child_stopping = FALSE;
 863     gboolean allow_dependent_migrations = TRUE;
 864 
 865     pe_action_t *stop = NULL;
 866     pe_action_t *stopped = NULL;
 867 
 868     pe_action_t *start = NULL;
 869     pe_action_t *started = NULL;
 870 
 871     pe_rsc_trace(rsc, "Creating actions for %s", rsc->id);
 872 
 873     for (GListPtr gIter = children; gIter != NULL; gIter = gIter->next) {
 874         pe_resource_t *child_rsc = (pe_resource_t *) gIter->data;
 875         gboolean starting = FALSE;
 876         gboolean stopping = FALSE;
 877 
 878         child_rsc->cmds->create_actions(child_rsc, data_set);
 879         clone_update_pseudo_status(child_rsc, &stopping, &starting, &child_active);
 880         if (stopping && starting) {
 881             allow_dependent_migrations = FALSE;
 882         }
 883 
 884         child_stopping |= stopping;
 885         child_starting |= starting;
 886     }
 887 
 888     /* start */
 889     start = create_pseudo_resource_op(rsc, RSC_START, !child_starting, TRUE, data_set);
 890     started = create_pseudo_resource_op(rsc, RSC_STARTED, !child_starting, FALSE, data_set);
 891     started->priority = INFINITY;
 892 
 893     if (child_active || child_starting) {
 894         update_action_flags(started, pe_action_runnable, __func__, __LINE__);
 895     }
 896 
 897     if (start_notify != NULL && *start_notify == NULL) {
 898         *start_notify = create_notification_boundaries(rsc, RSC_START, start, started, data_set);
 899     }
 900 
 901     /* stop */
 902     stop = create_pseudo_resource_op(rsc, RSC_STOP, !child_stopping, TRUE, data_set);
 903     stopped = create_pseudo_resource_op(rsc, RSC_STOPPED, !child_stopping, TRUE, data_set);
 904     stopped->priority = INFINITY;
 905     if (allow_dependent_migrations) {
 906         update_action_flags(stop, pe_action_migrate_runnable, __func__,
 907                             __LINE__);
 908     }
 909 
 910     if (stop_notify != NULL && *stop_notify == NULL) {
 911         *stop_notify = create_notification_boundaries(rsc, RSC_STOP, stop, stopped, data_set);
 912 
 913         if (start_notify && *start_notify && *stop_notify) {
 914             order_actions((*stop_notify)->post_done, (*start_notify)->pre, pe_order_optional);
 915         }
 916     }
 917 }
 918 
 919 void
 920 clone_internal_constraints(pe_resource_t *rsc, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 921 {
 922     pe_resource_t *last_rsc = NULL;
 923     GListPtr gIter;
 924     clone_variant_data_t *clone_data = NULL;
 925 
 926     get_clone_variant_data(clone_data, rsc);
 927 
 928     pe_rsc_trace(rsc, "Internal constraints for %s", rsc->id);
 929     new_rsc_order(rsc, RSC_STOPPED, rsc, RSC_START, pe_order_optional, data_set);
 930     new_rsc_order(rsc, RSC_START, rsc, RSC_STARTED, pe_order_runnable_left, data_set);
 931     new_rsc_order(rsc, RSC_STOP, rsc, RSC_STOPPED, pe_order_runnable_left, data_set);
 932 
 933     if (pcmk_is_set(rsc->flags, pe_rsc_promotable)) {
 934         new_rsc_order(rsc, RSC_DEMOTED, rsc, RSC_STOP, pe_order_optional, data_set);
 935         new_rsc_order(rsc, RSC_STARTED, rsc, RSC_PROMOTE, pe_order_runnable_left, data_set);
 936     }
 937 
 938     if (clone_data->ordered) {
 939         /* we have to maintain a consistent sorted child list when building order constraints */
 940         rsc->children = g_list_sort(rsc->children, sort_rsc_id);
 941     }
 942     for (gIter = rsc->children; gIter != NULL; gIter = gIter->next) {
 943         pe_resource_t *child_rsc = (pe_resource_t *) gIter->data;
 944 
 945         child_rsc->cmds->internal_constraints(child_rsc, data_set);
 946 
 947         order_start_start(rsc, child_rsc, pe_order_runnable_left | pe_order_implies_first_printed);
 948         new_rsc_order(child_rsc, RSC_START, rsc, RSC_STARTED, pe_order_implies_then_printed,
 949                       data_set);
 950         if (clone_data->ordered && last_rsc) {
 951             order_start_start(last_rsc, child_rsc, pe_order_optional);
 952         }
 953 
 954         order_stop_stop(rsc, child_rsc, pe_order_implies_first_printed);
 955         new_rsc_order(child_rsc, RSC_STOP, rsc, RSC_STOPPED, pe_order_implies_then_printed,
 956                       data_set);
 957         if (clone_data->ordered && last_rsc) {
 958             order_stop_stop(child_rsc, last_rsc, pe_order_optional);
 959         }
 960 
 961         last_rsc = child_rsc;
 962     }
 963     if (pcmk_is_set(rsc->flags, pe_rsc_promotable)) {
 964         promotable_constraints(rsc, data_set);
 965     }
 966 }
 967 
 968 bool
 969 assign_node(pe_resource_t * rsc, pe_node_t * node, gboolean force)
     /* [previous][next][first][last][top][bottom][index][help] */
 970 {
 971     bool changed = FALSE;
 972 
 973     if (rsc->children) {
 974 
 975         for (GListPtr gIter = rsc->children; gIter != NULL; gIter = gIter->next) {
 976             pe_resource_t *child_rsc = (pe_resource_t *) gIter->data;
 977 
 978             changed |= assign_node(child_rsc, node, force);
 979         }
 980 
 981         return changed;
 982     }
 983 
 984     if (rsc->allocated_to != NULL) {
 985         changed = true;
 986     }
 987 
 988     native_assign_node(rsc, NULL, node, force);
 989     return changed;
 990 }
 991 
 992 gboolean
 993 is_child_compatible(pe_resource_t *child_rsc, pe_node_t * local_node, enum rsc_role_e filter, gboolean current) 
     /* [previous][next][first][last][top][bottom][index][help] */
 994 {
 995     pe_node_t *node = NULL;
 996     enum rsc_role_e next_role = child_rsc->fns->state(child_rsc, current);
 997 
 998     CRM_CHECK(child_rsc && local_node, return FALSE);
 999     if (is_set_recursive(child_rsc, pe_rsc_block, TRUE) == FALSE) {
1000         /* We only want instances that haven't failed */
1001         node = child_rsc->fns->location(child_rsc, NULL, current);
1002     }
1003 
1004     if (filter != RSC_ROLE_UNKNOWN && next_role != filter) {
1005         crm_trace("Filtered %s", child_rsc->id);
1006         return FALSE;
1007     }
1008 
1009     if (node && (node->details == local_node->details)) {
1010         return TRUE;
1011 
1012     } else if (node) {
1013         crm_trace("%s - %s vs %s", child_rsc->id, node->details->uname,
1014                   local_node->details->uname);
1015 
1016     } else {
1017         crm_trace("%s - not allocated %d", child_rsc->id, current);
1018     }
1019     return FALSE;
1020 }
1021 
1022 pe_resource_t *
1023 find_compatible_child(pe_resource_t *local_child, pe_resource_t *rsc,
     /* [previous][next][first][last][top][bottom][index][help] */
1024                       enum rsc_role_e filter, gboolean current,
1025                       pe_working_set_t *data_set)
1026 {
1027     pe_resource_t *pair = NULL;
1028     GListPtr gIter = NULL;
1029     GListPtr scratch = NULL;
1030     pe_node_t *local_node = NULL;
1031 
1032     local_node = local_child->fns->location(local_child, NULL, current);
1033     if (local_node) {
1034         return find_compatible_child_by_node(local_child, local_node, rsc, filter, current);
1035     }
1036 
1037     scratch = g_hash_table_get_values(local_child->allowed_nodes);
1038     scratch = sort_nodes_by_weight(scratch, NULL, data_set);
1039 
1040     gIter = scratch;
1041     for (; gIter != NULL; gIter = gIter->next) {
1042         pe_node_t *node = (pe_node_t *) gIter->data;
1043 
1044         pair = find_compatible_child_by_node(local_child, node, rsc, filter, current);
1045         if (pair) {
1046             goto done;
1047         }
1048     }
1049 
1050     pe_rsc_debug(rsc, "Can't pair %s with %s", local_child->id, rsc->id);
1051   done:
1052     g_list_free(scratch);
1053     return pair;
1054 }
1055 
1056 void
1057 clone_rsc_colocation_lh(pe_resource_t *rsc_lh, pe_resource_t *rsc_rh,
     /* [previous][next][first][last][top][bottom][index][help] */
1058                         rsc_colocation_t *constraint,
1059                         pe_working_set_t *data_set)
1060 {
1061     /* -- Never called --
1062      *
1063      * Instead we add the colocation constraints to the child and call from there
1064      */
1065     CRM_ASSERT(FALSE);
1066 }
1067 
1068 void
1069 clone_rsc_colocation_rh(pe_resource_t *rsc_lh, pe_resource_t *rsc_rh,
     /* [previous][next][first][last][top][bottom][index][help] */
1070                         rsc_colocation_t *constraint,
1071                         pe_working_set_t *data_set)
1072 {
1073     GListPtr gIter = NULL;
1074     gboolean do_interleave = FALSE;
1075     const char *interleave_s = NULL;
1076 
1077     CRM_CHECK(constraint != NULL, return);
1078     CRM_CHECK(rsc_lh != NULL, pe_err("rsc_lh was NULL for %s", constraint->id); return);
1079     CRM_CHECK(rsc_rh != NULL, pe_err("rsc_rh was NULL for %s", constraint->id); return);
1080     CRM_CHECK(rsc_lh->variant == pe_native, return);
1081 
1082     if (constraint->score == 0) {
1083         return;
1084     }
1085     pe_rsc_trace(rsc_rh, "Processing constraint %s: %s -> %s %d",
1086                  constraint->id, rsc_lh->id, rsc_rh->id, constraint->score);
1087 
1088     if (pcmk_is_set(rsc_rh->flags, pe_rsc_promotable)) {
1089         if (pcmk_is_set(rsc_rh->flags, pe_rsc_provisional)) {
1090             pe_rsc_trace(rsc_rh, "%s is still provisional", rsc_rh->id);
1091             return;
1092         } else if (constraint->role_rh == RSC_ROLE_UNKNOWN) {
1093             pe_rsc_trace(rsc_rh, "Handling %s as a clone colocation", constraint->id);
1094         } else {
1095             promotable_colocation_rh(rsc_lh, rsc_rh, constraint, data_set);
1096             return;
1097         }
1098     }
1099 
1100     /* only the LHS side needs to be labeled as interleave */
1101     interleave_s = g_hash_table_lookup(constraint->rsc_lh->meta, XML_RSC_ATTR_INTERLEAVE);
1102     if(crm_is_true(interleave_s) && constraint->rsc_lh->variant > pe_group) {
1103         // TODO: Do we actually care about multiple RH copies sharing a LH copy anymore?
1104         if (copies_per_node(constraint->rsc_lh) != copies_per_node(constraint->rsc_rh)) {
1105             pcmk__config_err("Cannot interleave %s and %s because they do not "
1106                              "support the same number of instances per node",
1107                              constraint->rsc_lh->id, constraint->rsc_rh->id);
1108 
1109         } else {
1110             do_interleave = TRUE;
1111         }
1112     }
1113 
1114     if (pcmk_is_set(rsc_rh->flags, pe_rsc_provisional)) {
1115         pe_rsc_trace(rsc_rh, "%s is still provisional", rsc_rh->id);
1116         return;
1117 
1118     } else if (do_interleave) {
1119         pe_resource_t *rh_child = NULL;
1120 
1121         rh_child = find_compatible_child(rsc_lh, rsc_rh, RSC_ROLE_UNKNOWN,
1122                                          FALSE, data_set);
1123 
1124         if (rh_child) {
1125             pe_rsc_debug(rsc_rh, "Pairing %s with %s", rsc_lh->id, rh_child->id);
1126             rsc_lh->cmds->rsc_colocation_lh(rsc_lh, rh_child, constraint,
1127                                             data_set);
1128 
1129         } else if (constraint->score >= INFINITY) {
1130             crm_notice("Cannot pair %s with instance of %s", rsc_lh->id, rsc_rh->id);
1131             assign_node(rsc_lh, NULL, TRUE);
1132 
1133         } else {
1134             pe_rsc_debug(rsc_rh, "Cannot pair %s with instance of %s", rsc_lh->id, rsc_rh->id);
1135         }
1136 
1137         return;
1138 
1139     } else if (constraint->score >= INFINITY) {
1140         GListPtr rhs = NULL;
1141 
1142         gIter = rsc_rh->children;
1143         for (; gIter != NULL; gIter = gIter->next) {
1144             pe_resource_t *child_rsc = (pe_resource_t *) gIter->data;
1145             pe_node_t *chosen = child_rsc->fns->location(child_rsc, NULL, FALSE);
1146 
1147             if (chosen != NULL && is_set_recursive(child_rsc, pe_rsc_block, TRUE) == FALSE) {
1148                 pe_rsc_trace(rsc_rh, "Allowing %s: %s %d", constraint->id, chosen->details->uname, chosen->weight);
1149                 rhs = g_list_prepend(rhs, chosen);
1150             }
1151         }
1152 
1153         node_list_exclude(rsc_lh->allowed_nodes, rhs, FALSE);
1154         g_list_free(rhs);
1155         return;
1156     }
1157 
1158     gIter = rsc_rh->children;
1159     for (; gIter != NULL; gIter = gIter->next) {
1160         pe_resource_t *child_rsc = (pe_resource_t *) gIter->data;
1161 
1162         child_rsc->cmds->rsc_colocation_rh(rsc_lh, child_rsc, constraint,
1163                                            data_set);
1164     }
1165 }
1166 
1167 enum action_tasks
1168 clone_child_action(pe_action_t * action)
     /* [previous][next][first][last][top][bottom][index][help] */
1169 {
1170     enum action_tasks result = no_action;
1171     pe_resource_t *child = (pe_resource_t *) action->rsc->children->data;
1172 
1173     if (pcmk__strcase_any_of(action->task, "notify", "notified", NULL)) {
1174 
1175         /* Find the action we're notifying about instead */
1176 
1177         int stop = 0;
1178         char *key = action->uuid;
1179         int lpc = strlen(key);
1180 
1181         for (; lpc > 0; lpc--) {
1182             if (key[lpc] == '_' && stop == 0) {
1183                 stop = lpc;
1184 
1185             } else if (key[lpc] == '_') {
1186                 char *task_mutable = NULL;
1187 
1188                 lpc++;
1189                 task_mutable = strdup(key + lpc);
1190                 task_mutable[stop - lpc] = 0;
1191 
1192                 crm_trace("Extracted action '%s' from '%s'", task_mutable, key);
1193                 result = get_complex_task(child, task_mutable, TRUE);
1194                 free(task_mutable);
1195                 break;
1196             }
1197         }
1198 
1199     } else {
1200         result = get_complex_task(child, action->task, TRUE);
1201     }
1202     return result;
1203 }
1204 
1205 #define pe__clear_action_summary_flags(flags, action, flag) do {        \
1206         flags = pcmk__clear_flags_as(__func__, __LINE__, LOG_TRACE,     \
1207                                      "Action summary", action->rsc->id, \
1208                                      flags, flag, #flag);               \
1209     } while (0)
1210 
1211 enum pe_action_flags
1212 summary_action_flags(pe_action_t * action, GListPtr children, pe_node_t * node)
     /* [previous][next][first][last][top][bottom][index][help] */
1213 {
1214     GListPtr gIter = NULL;
1215     gboolean any_runnable = FALSE;
1216     gboolean check_runnable = TRUE;
1217     enum action_tasks task = clone_child_action(action);
1218     enum pe_action_flags flags = (pe_action_optional | pe_action_runnable | pe_action_pseudo);
1219     const char *task_s = task2text(task);
1220 
1221     for (gIter = children; gIter != NULL; gIter = gIter->next) {
1222         pe_action_t *child_action = NULL;
1223         pe_resource_t *child = (pe_resource_t *) gIter->data;
1224 
1225         child_action = find_first_action(child->actions, NULL, task_s, child->children ? NULL : node);
1226         pe_rsc_trace(action->rsc, "Checking for %s in %s on %s (%s)", task_s, child->id,
1227                      node ? node->details->uname : "none", child_action?child_action->uuid:"NA");
1228         if (child_action) {
1229             enum pe_action_flags child_flags = child->cmds->action_flags(child_action, node);
1230 
1231             if (pcmk_is_set(flags, pe_action_optional)
1232                 && !pcmk_is_set(child_flags, pe_action_optional)) {
1233                 pe_rsc_trace(child, "%s is mandatory because of %s", action->uuid,
1234                              child_action->uuid);
1235                 pe__clear_action_summary_flags(flags, action, pe_action_optional);
1236                 pe__clear_action_flags(action, pe_action_optional);
1237             }
1238             if (pcmk_is_set(child_flags, pe_action_runnable)) {
1239                 any_runnable = TRUE;
1240             }
1241         }
1242     }
1243 
1244     if (check_runnable && any_runnable == FALSE) {
1245         pe_rsc_trace(action->rsc, "%s is not runnable because no children are", action->uuid);
1246         pe__clear_action_summary_flags(flags, action, pe_action_runnable);
1247         if (node == NULL) {
1248             pe__clear_action_flags(action, pe_action_runnable);
1249         }
1250     }
1251 
1252     return flags;
1253 }
1254 
1255 enum pe_action_flags
1256 clone_action_flags(pe_action_t * action, pe_node_t * node)
     /* [previous][next][first][last][top][bottom][index][help] */
1257 {
1258     return summary_action_flags(action, action->rsc->children, node);
1259 }
1260 
1261 void
1262 clone_rsc_location(pe_resource_t *rsc, pe__location_t *constraint)
     /* [previous][next][first][last][top][bottom][index][help] */
1263 {
1264     GListPtr gIter = rsc->children;
1265 
1266     pe_rsc_trace(rsc, "Processing location constraint %s for %s", constraint->id, rsc->id);
1267 
1268     native_rsc_location(rsc, constraint);
1269 
1270     for (; gIter != NULL; gIter = gIter->next) {
1271         pe_resource_t *child_rsc = (pe_resource_t *) gIter->data;
1272 
1273         child_rsc->cmds->rsc_location(child_rsc, constraint);
1274     }
1275 }
1276 
1277 void
1278 clone_expand(pe_resource_t * rsc, pe_working_set_t * data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1279 {
1280     GListPtr gIter = NULL;
1281     clone_variant_data_t *clone_data = NULL;
1282 
1283     get_clone_variant_data(clone_data, rsc);
1284 
1285     gIter = rsc->actions;
1286     for (; gIter != NULL; gIter = gIter->next) {
1287         pe_action_t *op = (pe_action_t *) gIter->data;
1288 
1289         rsc->cmds->action_flags(op, NULL);
1290     }
1291 
1292     if (clone_data->start_notify) {
1293         collect_notification_data(rsc, TRUE, TRUE, clone_data->start_notify);
1294         pcmk__create_notification_keys(rsc, clone_data->start_notify, data_set);
1295         create_notifications(rsc, clone_data->start_notify, data_set);
1296     }
1297 
1298     if (clone_data->stop_notify) {
1299         collect_notification_data(rsc, TRUE, TRUE, clone_data->stop_notify);
1300         pcmk__create_notification_keys(rsc, clone_data->stop_notify, data_set);
1301         create_notifications(rsc, clone_data->stop_notify, data_set);
1302     }
1303 
1304     if (clone_data->promote_notify) {
1305         collect_notification_data(rsc, TRUE, TRUE, clone_data->promote_notify);
1306         pcmk__create_notification_keys(rsc, clone_data->promote_notify, data_set);
1307         create_notifications(rsc, clone_data->promote_notify, data_set);
1308     }
1309 
1310     if (clone_data->demote_notify) {
1311         collect_notification_data(rsc, TRUE, TRUE, clone_data->demote_notify);
1312         pcmk__create_notification_keys(rsc, clone_data->demote_notify, data_set);
1313         create_notifications(rsc, clone_data->demote_notify, data_set);
1314     }
1315 
1316     /* Now that the notifcations have been created we can expand the children */
1317 
1318     gIter = rsc->children;
1319     for (; gIter != NULL; gIter = gIter->next) {
1320         pe_resource_t *child_rsc = (pe_resource_t *) gIter->data;
1321 
1322         child_rsc->cmds->expand(child_rsc, data_set);
1323     }
1324 
1325     native_expand(rsc, data_set);
1326 
1327     /* The notifications are in the graph now, we can destroy the notify_data */
1328     free_notification_data(clone_data->demote_notify);
1329     clone_data->demote_notify = NULL;
1330     free_notification_data(clone_data->stop_notify);
1331     clone_data->stop_notify = NULL;
1332     free_notification_data(clone_data->start_notify);
1333     clone_data->start_notify = NULL;
1334     free_notification_data(clone_data->promote_notify);
1335     clone_data->promote_notify = NULL;
1336 }
1337 
1338 // Check whether a resource or any of its children is known on node
1339 static bool
1340 rsc_known_on(const pe_resource_t *rsc, const pe_node_t *node)
     /* [previous][next][first][last][top][bottom][index][help] */
1341 {
1342     if (rsc->children) {
1343         for (GList *child_iter = rsc->children; child_iter != NULL;
1344              child_iter = child_iter->next) {
1345 
1346             pe_resource_t *child = (pe_resource_t *) child_iter->data;
1347 
1348             if (rsc_known_on(child, node)) {
1349                 return TRUE;
1350             }
1351         }
1352 
1353     } else if (rsc->known_on) {
1354         GHashTableIter iter;
1355         pe_node_t *known_node = NULL;
1356 
1357         g_hash_table_iter_init(&iter, rsc->known_on);
1358         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &known_node)) {
1359             if (node->details == known_node->details) {
1360                 return TRUE;
1361             }
1362         }
1363     }
1364     return FALSE;
1365 }
1366 
1367 // Look for an instance of clone that is known on node
1368 static pe_resource_t *
1369 find_instance_on(const pe_resource_t *clone, const pe_node_t *node)
     /* [previous][next][first][last][top][bottom][index][help] */
1370 {
1371     for (GList *gIter = clone->children; gIter != NULL; gIter = gIter->next) {
1372         pe_resource_t *child = (pe_resource_t *) gIter->data;
1373 
1374         if (rsc_known_on(child, node)) {
1375             return child;
1376         }
1377     }
1378     return NULL;
1379 }
1380 
1381 // For unique clones, probe each instance separately
1382 static gboolean
1383 probe_unique_clone(pe_resource_t *rsc, pe_node_t *node, pe_action_t *complete,
     /* [previous][next][first][last][top][bottom][index][help] */
1384                    gboolean force, pe_working_set_t *data_set)
1385 {
1386     gboolean any_created = FALSE;
1387 
1388     for (GList *child_iter = rsc->children; child_iter != NULL;
1389          child_iter = child_iter->next) {
1390 
1391         pe_resource_t *child = (pe_resource_t *) child_iter->data;
1392 
1393         any_created |= child->cmds->create_probe(child, node, complete, force,
1394                                                  data_set);
1395     }
1396     return any_created;
1397 }
1398 
1399 // For anonymous clones, only a single instance needs to be probed
1400 static gboolean
1401 probe_anonymous_clone(pe_resource_t *rsc, pe_node_t *node,
     /* [previous][next][first][last][top][bottom][index][help] */
1402                       pe_action_t *complete, gboolean force,
1403                       pe_working_set_t *data_set)
1404 {
1405     // First, check if we probed an instance on this node last time
1406     pe_resource_t *child = find_instance_on(rsc, node);
1407 
1408     // Otherwise, check if we plan to start an instance on this node
1409     if (child == NULL) {
1410         for (GList *child_iter = rsc->children; child_iter && !child;
1411              child_iter = child_iter->next) {
1412 
1413             pe_node_t *local_node = NULL;
1414             pe_resource_t *child_rsc = (pe_resource_t *) child_iter->data;
1415 
1416             if (child_rsc) { /* make clang analyzer happy */
1417                 local_node = child_rsc->fns->location(child_rsc, NULL, FALSE);
1418                 if (local_node && (local_node->details == node->details)) {
1419                     child = child_rsc;
1420                 }
1421             }
1422         }
1423     }
1424 
1425     // Otherwise, use the first clone instance
1426     if (child == NULL) {
1427         child = rsc->children->data;
1428     }
1429     CRM_ASSERT(child);
1430     return child->cmds->create_probe(child, node, complete, force, data_set);
1431 }
1432 
1433 gboolean
1434 clone_create_probe(pe_resource_t * rsc, pe_node_t * node, pe_action_t * complete,
     /* [previous][next][first][last][top][bottom][index][help] */
1435                    gboolean force, pe_working_set_t * data_set)
1436 {
1437     gboolean any_created = FALSE;
1438 
1439     CRM_ASSERT(rsc);
1440 
1441     rsc->children = g_list_sort(rsc->children, sort_rsc_id);
1442     if (rsc->children == NULL) {
1443         pe_warn("Clone %s has no children", rsc->id);
1444         return FALSE;
1445     }
1446 
1447     if (rsc->exclusive_discover) {
1448         pe_node_t *allowed = g_hash_table_lookup(rsc->allowed_nodes, node->details->id);
1449         if (allowed && allowed->rsc_discover_mode != pe_discover_exclusive) {
1450             /* exclusive discover is enabled and this node is not marked
1451              * as a node this resource should be discovered on
1452              *
1453              * remove the node from allowed_nodes so that the
1454              * notification contains only nodes that we might ever run
1455              * on
1456              */
1457             g_hash_table_remove(rsc->allowed_nodes, node->details->id);
1458 
1459             /* Bit of a shortcut - might as well take it */
1460             return FALSE;
1461         }
1462     }
1463 
1464     if (pcmk_is_set(rsc->flags, pe_rsc_unique)) {
1465         any_created = probe_unique_clone(rsc, node, complete, force, data_set);
1466     } else {
1467         any_created = probe_anonymous_clone(rsc, node, complete, force,
1468                                             data_set);
1469     }
1470     return any_created;
1471 }
1472 
1473 void
1474 clone_append_meta(pe_resource_t * rsc, xmlNode * xml)
     /* [previous][next][first][last][top][bottom][index][help] */
1475 {
1476     char *name = NULL;
1477     clone_variant_data_t *clone_data = NULL;
1478 
1479     get_clone_variant_data(clone_data, rsc);
1480 
1481     name = crm_meta_name(XML_RSC_ATTR_UNIQUE);
1482     crm_xml_add(xml, name, pe__rsc_bool_str(rsc, pe_rsc_unique));
1483     free(name);
1484 
1485     name = crm_meta_name(XML_RSC_ATTR_NOTIFY);
1486     crm_xml_add(xml, name, pe__rsc_bool_str(rsc, pe_rsc_notify));
1487     free(name);
1488 
1489     name = crm_meta_name(XML_RSC_ATTR_INCARNATION_MAX);
1490     crm_xml_add_int(xml, name, clone_data->clone_max);
1491     free(name);
1492 
1493     name = crm_meta_name(XML_RSC_ATTR_INCARNATION_NODEMAX);
1494     crm_xml_add_int(xml, name, clone_data->clone_node_max);
1495     free(name);
1496 
1497     if (pcmk_is_set(rsc->flags, pe_rsc_promotable)) {
1498         name = crm_meta_name(XML_RSC_ATTR_PROMOTED_MAX);
1499         crm_xml_add_int(xml, name, clone_data->promoted_max);
1500         free(name);
1501 
1502         name = crm_meta_name(XML_RSC_ATTR_PROMOTED_NODEMAX);
1503         crm_xml_add_int(xml, name, clone_data->promoted_node_max);
1504         free(name);
1505 
1506         /* @COMPAT Maintain backward compatibility with resource agents that
1507          * expect the old names (deprecated since 2.0.0).
1508          */
1509         name = crm_meta_name(XML_RSC_ATTR_MASTER_MAX);
1510         crm_xml_add_int(xml, name, clone_data->promoted_max);
1511         free(name);
1512 
1513         name = crm_meta_name(XML_RSC_ATTR_MASTER_NODEMAX);
1514         crm_xml_add_int(xml, name, clone_data->promoted_node_max);
1515         free(name);
1516     }
1517 }

/* [previous][next][first][last][top][bottom][index][help] */