root/attrd/main.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. attrd_cpg_dispatch
  2. attrd_cpg_destroy
  3. attrd_cib_replaced_cb
  4. attrd_cib_destroy_cb
  5. attrd_erase_cb
  6. attrd_erase_attrs
  7. attrd_cib_connect
  8. attrd_ipc_dispatch
  9. attrd_cluster_connect
  10. main

   1 /*
   2  * Copyright (C) 2013 Andrew Beekhof <andrew@beekhof.net>
   3  *
   4  * This program is free software; you can redistribute it and/or
   5  * modify it under the terms of the GNU General Public
   6  * License as published by the Free Software Foundation; either
   7  * version 2 of the License, or (at your option) any later version.
   8  *
   9  * This software is distributed in the hope that it will be useful,
  10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  12  * General Public License for more details.
  13  *
  14  * You should have received a copy of the GNU General Public
  15  * License along with this library; if not, write to the Free Software
  16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  17  */
  18 
  19 #include <crm_internal.h>
  20 
  21 #include <sys/param.h>
  22 #include <stdio.h>
  23 #include <sys/types.h>
  24 #include <sys/stat.h>
  25 #include <unistd.h>
  26 
  27 #include <stdlib.h>
  28 #include <errno.h>
  29 #include <fcntl.h>
  30 
  31 #include <crm/crm.h>
  32 #include <crm/cib/internal.h>
  33 #include <crm/msg_xml.h>
  34 #include <crm/pengine/rules.h>
  35 #include <crm/common/iso8601.h>
  36 #include <crm/common/ipc.h>
  37 #include <crm/common/ipcs.h>
  38 #include <crm/cluster/internal.h>
  39 #include <crm/cluster/election.h>
  40 
  41 #include <crm/common/xml.h>
  42 
  43 #include <crm/attrd.h>
  44 #include <internal.h>
  45 
  46 lrmd_t *the_lrmd = NULL;
  47 crm_cluster_t *attrd_cluster = NULL;
  48 election_t *writer = NULL;
  49 crm_trigger_t *attrd_config_read = NULL;
  50 static int attrd_exit_status = pcmk_ok;
  51 
  52 static void
  53 attrd_cpg_dispatch(cpg_handle_t handle,
     /* [previous][next][first][last][top][bottom][index][help] */
  54                  const struct cpg_name *groupName,
  55                  uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
  56 {
  57     uint32_t kind = 0;
  58     xmlNode *xml = NULL;
  59     const char *from = NULL;
  60     char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
  61 
  62     if(data == NULL) {
  63         return;
  64     }
  65 
  66     if (kind == crm_class_cluster) {
  67         xml = string2xml(data);
  68     }
  69 
  70     if (xml == NULL) {
  71         crm_err("Bad message of class %d received from %s[%u]: '%.120s'", kind, from, nodeid, data);
  72     } else {
  73         crm_node_t *peer = crm_get_peer(nodeid, from);
  74 
  75         attrd_peer_message(peer, xml);
  76     }
  77 
  78     free_xml(xml);
  79     free(data);
  80 }
  81 
  82 static void
  83 attrd_cpg_destroy(gpointer unused)
     /* [previous][next][first][last][top][bottom][index][help] */
  84 {
  85     if (attrd_shutting_down()) {
  86         crm_info("Corosync disconnection complete");
  87 
  88     } else {
  89         crm_crit("Lost connection to Corosync service!");
  90         attrd_exit_status = ECONNRESET;
  91         attrd_shutdown(0);
  92     }
  93 }
  94 
  95 static void
  96 attrd_cib_replaced_cb(const char *event, xmlNode * msg)
     /* [previous][next][first][last][top][bottom][index][help] */
  97 {
  98     crm_notice("Updating all attributes after %s event", event);
  99     if(election_state(writer) == election_won) {
 100         write_attributes(TRUE);
 101     }
 102 }
 103 
 104 static void
 105 attrd_cib_destroy_cb(gpointer user_data)
     /* [previous][next][first][last][top][bottom][index][help] */
 106 {
 107     cib_t *conn = user_data;
 108 
 109     conn->cmds->signoff(conn);  /* Ensure IPC is cleaned up */
 110 
 111     if (attrd_shutting_down()) {
 112         crm_info("Connection disconnection complete");
 113 
 114     } else {
 115         /* eventually this should trigger a reconnect, not a shutdown */
 116         crm_err("Lost connection to CIB service!");
 117         attrd_exit_status = ECONNRESET;
 118         attrd_shutdown(0);
 119     }
 120 
 121     return;
 122 }
 123 
 124 static void
 125 attrd_erase_cb(xmlNode *msg, int call_id, int rc, xmlNode *output,
     /* [previous][next][first][last][top][bottom][index][help] */
 126                void *user_data)
 127 {
 128     do_crm_log_unlikely((rc? LOG_NOTICE : LOG_DEBUG),
 129                         "Cleared transient attributes: %s "
 130                         CRM_XS " xpath=%s rc=%d",
 131                         pcmk_strerror(rc), (char *) user_data, rc);
 132 }
 133 
 134 #define XPATH_TRANSIENT "//node_state[@uname='%s']/" XML_TAG_TRANSIENT_NODEATTRS
 135 
 136 /*!
 137  * \internal
 138  * \brief Wipe all transient attributes for this node from the CIB
 139  *
 140  * Clear any previous transient node attributes from the CIB. This is
 141  * normally done by the DC's crmd when this node leaves the cluster, but
 142  * this handles the case where the node restarted so quickly that the
 143  * cluster layer didn't notice.
 144  *
 145  * \todo If attrd respawns after crashing (see PCMK_respawned), ideally we'd
 146  *       skip this and sync our attributes from the writer. However, currently
 147  *       we reject any values for us that the writer has, in
 148  *       attrd_peer_update().
 149  */
 150 static void
 151 attrd_erase_attrs()
     /* [previous][next][first][last][top][bottom][index][help] */
 152 {
 153     int call_id;
 154     char *xpath = crm_strdup_printf(XPATH_TRANSIENT, attrd_cluster->uname);
 155 
 156     crm_info("Clearing transient attributes from CIB " CRM_XS " xpath=%s",
 157              xpath);
 158 
 159     call_id = the_cib->cmds->delete(the_cib, xpath, NULL,
 160                                     cib_quorum_override | cib_xpath);
 161     the_cib->cmds->register_callback_full(the_cib, call_id, 120, FALSE, xpath,
 162                                           "attrd_erase_cb", attrd_erase_cb,
 163                                           free);
 164 }
 165 
 166 static int
 167 attrd_cib_connect(int max_retry)
     /* [previous][next][first][last][top][bottom][index][help] */
 168 {
 169     static int attempts = 0;
 170 
 171     int rc = -ENOTCONN;
 172 
 173     the_cib = cib_new();
 174     if (the_cib == NULL) {
 175         return DAEMON_RESPAWN_STOP;
 176     }
 177 
 178     do {
 179         if(attempts > 0) {
 180             sleep(attempts);
 181         }
 182 
 183         attempts++;
 184         crm_debug("CIB signon attempt %d", attempts);
 185         rc = the_cib->cmds->signon(the_cib, T_ATTRD, cib_command);
 186 
 187     } while(rc != pcmk_ok && attempts < max_retry);
 188 
 189     if (rc != pcmk_ok) {
 190         crm_err("Signon to CIB failed: %s (%d)", pcmk_strerror(rc), rc);
 191         goto cleanup;
 192     }
 193 
 194     crm_debug("Connected to the CIB after %d attempts", attempts);
 195 
 196     rc = the_cib->cmds->set_connection_dnotify(the_cib, attrd_cib_destroy_cb);
 197     if (rc != pcmk_ok) {
 198         crm_err("Could not set disconnection callback");
 199         goto cleanup;
 200     }
 201 
 202     rc = the_cib->cmds->add_notify_callback(the_cib, T_CIB_REPLACE_NOTIFY, attrd_cib_replaced_cb);
 203     if(rc != pcmk_ok) {
 204         crm_err("Could not set CIB notification callback");
 205         goto cleanup;
 206     }
 207 
 208     rc = the_cib->cmds->add_notify_callback(the_cib, T_CIB_DIFF_NOTIFY, attrd_cib_updated_cb);
 209     if (rc != pcmk_ok) {
 210         crm_err("Could not set CIB notification callback (update)");
 211         goto cleanup;
 212     }
 213 
 214     // We have no attribute values in memory, wipe the CIB to match
 215     attrd_erase_attrs();
 216 
 217     // Set a trigger for reading the CIB (for the alerts section)
 218     attrd_config_read = mainloop_add_trigger(G_PRIORITY_HIGH, attrd_read_options, NULL);
 219 
 220     // Always read the CIB at start-up
 221     mainloop_set_trigger(attrd_config_read);
 222 
 223     return pcmk_ok;
 224 
 225   cleanup:
 226     the_cib->cmds->signoff(the_cib);
 227     cib_delete(the_cib);
 228     the_cib = NULL;
 229     return DAEMON_RESPAWN_STOP;
 230 }
 231 
 232 static int32_t
 233 attrd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size)
     /* [previous][next][first][last][top][bottom][index][help] */
 234 {
 235     uint32_t id = 0;
 236     uint32_t flags = 0;
 237     crm_client_t *client = crm_client_get(c);
 238     xmlNode *xml = crm_ipcs_recv(client, data, size, &id, &flags);
 239     const char *op;
 240 
 241     if (xml == NULL) {
 242         crm_debug("No msg from %d (%p)", crm_ipcs_client_pid(c), c);
 243         return 0;
 244     }
 245 #if ENABLE_ACL
 246     CRM_ASSERT(client->user != NULL);
 247     crm_acl_get_set_user(xml, F_ATTRD_USER, client->user);
 248 #endif
 249 
 250     crm_trace("Processing msg from %d (%p)", crm_ipcs_client_pid(c), c);
 251     crm_log_xml_trace(xml, __FUNCTION__);
 252 
 253     op = crm_element_value(xml, F_ATTRD_TASK);
 254 
 255     if (client->name == NULL) {
 256         const char *value = crm_element_value(xml, F_ORIG);
 257         client->name = crm_strdup_printf("%s.%d", value?value:"unknown", client->pid);
 258     }
 259 
 260     if (safe_str_eq(op, ATTRD_OP_PEER_REMOVE)) {
 261         attrd_send_ack(client, id, flags);
 262         attrd_client_peer_remove(client->name, xml);
 263 
 264     } else if (safe_str_eq(op, ATTRD_OP_CLEAR_FAILURE)) {
 265         attrd_send_ack(client, id, flags);
 266         attrd_client_clear_failure(xml);
 267 
 268     } else if (safe_str_eq(op, ATTRD_OP_UPDATE)) {
 269         attrd_send_ack(client, id, flags);
 270         attrd_client_update(xml);
 271 
 272     } else if (safe_str_eq(op, ATTRD_OP_UPDATE_BOTH)) {
 273         attrd_send_ack(client, id, flags);
 274         attrd_client_update(xml);
 275 
 276     } else if (safe_str_eq(op, ATTRD_OP_UPDATE_DELAY)) {
 277         attrd_send_ack(client, id, flags);
 278         attrd_client_update(xml);
 279   
 280     } else if (safe_str_eq(op, ATTRD_OP_REFRESH)) {
 281         attrd_send_ack(client, id, flags);
 282         attrd_client_refresh();
 283 
 284     } else if (safe_str_eq(op, ATTRD_OP_QUERY)) {
 285         /* queries will get reply, so no ack is necessary */
 286         attrd_client_query(client, id, flags, xml);
 287 
 288     } else {
 289         crm_info("Ignoring request from client %s with unknown operation %s",
 290                  client->name, op);
 291     }
 292 
 293     free_xml(xml);
 294     return 0;
 295 }
 296 
 297 static int
 298 attrd_cluster_connect()
     /* [previous][next][first][last][top][bottom][index][help] */
 299 {
 300     attrd_cluster = calloc(1, sizeof(crm_cluster_t));
 301 
 302     attrd_cluster->destroy = attrd_cpg_destroy;
 303     attrd_cluster->cpg.cpg_deliver_fn = attrd_cpg_dispatch;
 304     attrd_cluster->cpg.cpg_confchg_fn = pcmk_cpg_membership;
 305 
 306     crm_set_status_callback(&attrd_peer_change_cb);
 307 
 308     if (crm_cluster_connect(attrd_cluster) == FALSE) {
 309         crm_err("Cluster connection failed");
 310         return DAEMON_RESPAWN_STOP;
 311     }
 312     return pcmk_ok;
 313 }
 314 
 315 /* *INDENT-OFF* */
 316 static struct crm_option long_options[] = {
 317     /* Top-level Options */
 318     {"help",    0, 0, '?', "\tThis text"},
 319     {"verbose", 0, 0, 'V', "\tIncrease debug output"},
 320 
 321     {0, 0, 0, 0}
 322 };
 323 /* *INDENT-ON* */
 324 
 325 int
 326 main(int argc, char **argv)
     /* [previous][next][first][last][top][bottom][index][help] */
 327 {
 328     int flag = 0;
 329     int index = 0;
 330     int argerr = 0;
 331     qb_ipcs_service_t *ipcs = NULL;
 332 
 333     attrd_init_mainloop();
 334     crm_log_preinit(NULL, argc, argv);
 335     crm_set_options(NULL, "[options]", long_options,
 336                     "Daemon for aggregating and atomically storing node attribute updates into the CIB");
 337 
 338     mainloop_add_signal(SIGTERM, attrd_shutdown);
 339 
 340      while (1) {
 341         flag = crm_get_option(argc, argv, &index);
 342         if (flag == -1)
 343             break;
 344 
 345         switch (flag) {
 346             case 'V':
 347                 crm_bump_log_level(argc, argv);
 348                 break;
 349             case 'h':          /* Help message */
 350                 crm_help(flag, EX_OK);
 351                 break;
 352             default:
 353                 ++argerr;
 354                 break;
 355         }
 356     }
 357 
 358     if (optind > argc) {
 359         ++argerr;
 360     }
 361 
 362     if (argerr) {
 363         crm_help('?', EX_USAGE);
 364     }
 365 
 366     crm_log_init(T_ATTRD, LOG_INFO, TRUE, FALSE, argc, argv, FALSE);
 367     crm_info("Starting up");
 368     attributes = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free_attribute);
 369 
 370     attrd_exit_status = attrd_cluster_connect();
 371     if (attrd_exit_status != pcmk_ok) {
 372         goto done;
 373     }
 374     crm_info("Cluster connection active");
 375 
 376     attrd_exit_status = attrd_cib_connect(10);
 377     if (attrd_exit_status != pcmk_ok) {
 378         goto done;
 379     }
 380     crm_info("CIB connection active");
 381 
 382     writer = election_init(T_ATTRD, attrd_cluster->uname, 120000, attrd_election_cb);
 383     attrd_init_ipc(&ipcs, attrd_ipc_dispatch);
 384     crm_info("Accepting attribute updates");
 385 
 386     attrd_run_mainloop();
 387 
 388   done:
 389     crm_info("Shutting down attribute manager");
 390 
 391     election_fini(writer);
 392     if (ipcs) {
 393         crm_client_disconnect_all(ipcs);
 394         qb_ipcs_destroy(ipcs);
 395         g_hash_table_destroy(attributes);
 396     }
 397 
 398     attrd_lrmd_disconnect();
 399     attrd_cib_disconnect();
 400 
 401     return crm_exit(attrd_exit_status);
 402 }

/* [previous][next][first][last][top][bottom][index][help] */