This source file includes following definitions.
- attrd_cpg_dispatch
- attrd_cpg_destroy
- attrd_cib_replaced_cb
- attrd_cib_destroy_cb
- attrd_erase_cb
- attrd_erase_attrs
- attrd_cib_connect
- attrd_ipc_dispatch
- attrd_cluster_connect
- main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include <crm_internal.h>
20
21 #include <sys/param.h>
22 #include <stdio.h>
23 #include <sys/types.h>
24 #include <sys/stat.h>
25 #include <unistd.h>
26
27 #include <stdlib.h>
28 #include <errno.h>
29 #include <fcntl.h>
30
31 #include <crm/crm.h>
32 #include <crm/cib/internal.h>
33 #include <crm/msg_xml.h>
34 #include <crm/pengine/rules.h>
35 #include <crm/common/iso8601.h>
36 #include <crm/common/ipc.h>
37 #include <crm/common/ipcs.h>
38 #include <crm/cluster/internal.h>
39 #include <crm/cluster/election.h>
40
41 #include <crm/common/xml.h>
42
43 #include <crm/attrd.h>
44 #include <internal.h>
45
46 lrmd_t *the_lrmd = NULL;
47 crm_cluster_t *attrd_cluster = NULL;
48 election_t *writer = NULL;
49 crm_trigger_t *attrd_config_read = NULL;
50 static int attrd_exit_status = pcmk_ok;
51
52 static void
53 attrd_cpg_dispatch(cpg_handle_t handle,
54 const struct cpg_name *groupName,
55 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
56 {
57 uint32_t kind = 0;
58 xmlNode *xml = NULL;
59 const char *from = NULL;
60 char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
61
62 if(data == NULL) {
63 return;
64 }
65
66 if (kind == crm_class_cluster) {
67 xml = string2xml(data);
68 }
69
70 if (xml == NULL) {
71 crm_err("Bad message of class %d received from %s[%u]: '%.120s'", kind, from, nodeid, data);
72 } else {
73 crm_node_t *peer = crm_get_peer(nodeid, from);
74
75 attrd_peer_message(peer, xml);
76 }
77
78 free_xml(xml);
79 free(data);
80 }
81
82 static void
83 attrd_cpg_destroy(gpointer unused)
84 {
85 if (attrd_shutting_down()) {
86 crm_info("Corosync disconnection complete");
87
88 } else {
89 crm_crit("Lost connection to Corosync service!");
90 attrd_exit_status = ECONNRESET;
91 attrd_shutdown(0);
92 }
93 }
94
95 static void
96 attrd_cib_replaced_cb(const char *event, xmlNode * msg)
97 {
98 crm_notice("Updating all attributes after %s event", event);
99 if(election_state(writer) == election_won) {
100 write_attributes(TRUE);
101 }
102 }
103
104 static void
105 attrd_cib_destroy_cb(gpointer user_data)
106 {
107 cib_t *conn = user_data;
108
109 conn->cmds->signoff(conn);
110
111 if (attrd_shutting_down()) {
112 crm_info("Connection disconnection complete");
113
114 } else {
115
116 crm_err("Lost connection to CIB service!");
117 attrd_exit_status = ECONNRESET;
118 attrd_shutdown(0);
119 }
120
121 return;
122 }
123
124 static void
125 attrd_erase_cb(xmlNode *msg, int call_id, int rc, xmlNode *output,
126 void *user_data)
127 {
128 do_crm_log_unlikely((rc? LOG_NOTICE : LOG_DEBUG),
129 "Cleared transient attributes: %s "
130 CRM_XS " xpath=%s rc=%d",
131 pcmk_strerror(rc), (char *) user_data, rc);
132 }
133
134 #define XPATH_TRANSIENT "//node_state[@uname='%s']/" XML_TAG_TRANSIENT_NODEATTRS
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 static void
151 attrd_erase_attrs()
152 {
153 int call_id;
154 char *xpath = crm_strdup_printf(XPATH_TRANSIENT, attrd_cluster->uname);
155
156 crm_info("Clearing transient attributes from CIB " CRM_XS " xpath=%s",
157 xpath);
158
159 call_id = the_cib->cmds->delete(the_cib, xpath, NULL,
160 cib_quorum_override | cib_xpath);
161 the_cib->cmds->register_callback_full(the_cib, call_id, 120, FALSE, xpath,
162 "attrd_erase_cb", attrd_erase_cb,
163 free);
164 }
165
166 static int
167 attrd_cib_connect(int max_retry)
168 {
169 static int attempts = 0;
170
171 int rc = -ENOTCONN;
172
173 the_cib = cib_new();
174 if (the_cib == NULL) {
175 return DAEMON_RESPAWN_STOP;
176 }
177
178 do {
179 if(attempts > 0) {
180 sleep(attempts);
181 }
182
183 attempts++;
184 crm_debug("CIB signon attempt %d", attempts);
185 rc = the_cib->cmds->signon(the_cib, T_ATTRD, cib_command);
186
187 } while(rc != pcmk_ok && attempts < max_retry);
188
189 if (rc != pcmk_ok) {
190 crm_err("Signon to CIB failed: %s (%d)", pcmk_strerror(rc), rc);
191 goto cleanup;
192 }
193
194 crm_debug("Connected to the CIB after %d attempts", attempts);
195
196 rc = the_cib->cmds->set_connection_dnotify(the_cib, attrd_cib_destroy_cb);
197 if (rc != pcmk_ok) {
198 crm_err("Could not set disconnection callback");
199 goto cleanup;
200 }
201
202 rc = the_cib->cmds->add_notify_callback(the_cib, T_CIB_REPLACE_NOTIFY, attrd_cib_replaced_cb);
203 if(rc != pcmk_ok) {
204 crm_err("Could not set CIB notification callback");
205 goto cleanup;
206 }
207
208 rc = the_cib->cmds->add_notify_callback(the_cib, T_CIB_DIFF_NOTIFY, attrd_cib_updated_cb);
209 if (rc != pcmk_ok) {
210 crm_err("Could not set CIB notification callback (update)");
211 goto cleanup;
212 }
213
214
215 attrd_erase_attrs();
216
217
218 attrd_config_read = mainloop_add_trigger(G_PRIORITY_HIGH, attrd_read_options, NULL);
219
220
221 mainloop_set_trigger(attrd_config_read);
222
223 return pcmk_ok;
224
225 cleanup:
226 the_cib->cmds->signoff(the_cib);
227 cib_delete(the_cib);
228 the_cib = NULL;
229 return DAEMON_RESPAWN_STOP;
230 }
231
232 static int32_t
233 attrd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size)
234 {
235 uint32_t id = 0;
236 uint32_t flags = 0;
237 crm_client_t *client = crm_client_get(c);
238 xmlNode *xml = crm_ipcs_recv(client, data, size, &id, &flags);
239 const char *op;
240
241 if (xml == NULL) {
242 crm_debug("No msg from %d (%p)", crm_ipcs_client_pid(c), c);
243 return 0;
244 }
245 #if ENABLE_ACL
246 CRM_ASSERT(client->user != NULL);
247 crm_acl_get_set_user(xml, F_ATTRD_USER, client->user);
248 #endif
249
250 crm_trace("Processing msg from %d (%p)", crm_ipcs_client_pid(c), c);
251 crm_log_xml_trace(xml, __FUNCTION__);
252
253 op = crm_element_value(xml, F_ATTRD_TASK);
254
255 if (client->name == NULL) {
256 const char *value = crm_element_value(xml, F_ORIG);
257 client->name = crm_strdup_printf("%s.%d", value?value:"unknown", client->pid);
258 }
259
260 if (safe_str_eq(op, ATTRD_OP_PEER_REMOVE)) {
261 attrd_send_ack(client, id, flags);
262 attrd_client_peer_remove(client->name, xml);
263
264 } else if (safe_str_eq(op, ATTRD_OP_CLEAR_FAILURE)) {
265 attrd_send_ack(client, id, flags);
266 attrd_client_clear_failure(xml);
267
268 } else if (safe_str_eq(op, ATTRD_OP_UPDATE)) {
269 attrd_send_ack(client, id, flags);
270 attrd_client_update(xml);
271
272 } else if (safe_str_eq(op, ATTRD_OP_UPDATE_BOTH)) {
273 attrd_send_ack(client, id, flags);
274 attrd_client_update(xml);
275
276 } else if (safe_str_eq(op, ATTRD_OP_UPDATE_DELAY)) {
277 attrd_send_ack(client, id, flags);
278 attrd_client_update(xml);
279
280 } else if (safe_str_eq(op, ATTRD_OP_REFRESH)) {
281 attrd_send_ack(client, id, flags);
282 attrd_client_refresh();
283
284 } else if (safe_str_eq(op, ATTRD_OP_QUERY)) {
285
286 attrd_client_query(client, id, flags, xml);
287
288 } else {
289 crm_info("Ignoring request from client %s with unknown operation %s",
290 client->name, op);
291 }
292
293 free_xml(xml);
294 return 0;
295 }
296
297 static int
298 attrd_cluster_connect()
299 {
300 attrd_cluster = calloc(1, sizeof(crm_cluster_t));
301
302 attrd_cluster->destroy = attrd_cpg_destroy;
303 attrd_cluster->cpg.cpg_deliver_fn = attrd_cpg_dispatch;
304 attrd_cluster->cpg.cpg_confchg_fn = pcmk_cpg_membership;
305
306 crm_set_status_callback(&attrd_peer_change_cb);
307
308 if (crm_cluster_connect(attrd_cluster) == FALSE) {
309 crm_err("Cluster connection failed");
310 return DAEMON_RESPAWN_STOP;
311 }
312 return pcmk_ok;
313 }
314
315
316 static struct crm_option long_options[] = {
317
318 {"help", 0, 0, '?', "\tThis text"},
319 {"verbose", 0, 0, 'V', "\tIncrease debug output"},
320
321 {0, 0, 0, 0}
322 };
323
324
325 int
326 main(int argc, char **argv)
327 {
328 int flag = 0;
329 int index = 0;
330 int argerr = 0;
331 qb_ipcs_service_t *ipcs = NULL;
332
333 attrd_init_mainloop();
334 crm_log_preinit(NULL, argc, argv);
335 crm_set_options(NULL, "[options]", long_options,
336 "Daemon for aggregating and atomically storing node attribute updates into the CIB");
337
338 mainloop_add_signal(SIGTERM, attrd_shutdown);
339
340 while (1) {
341 flag = crm_get_option(argc, argv, &index);
342 if (flag == -1)
343 break;
344
345 switch (flag) {
346 case 'V':
347 crm_bump_log_level(argc, argv);
348 break;
349 case 'h':
350 crm_help(flag, EX_OK);
351 break;
352 default:
353 ++argerr;
354 break;
355 }
356 }
357
358 if (optind > argc) {
359 ++argerr;
360 }
361
362 if (argerr) {
363 crm_help('?', EX_USAGE);
364 }
365
366 crm_log_init(T_ATTRD, LOG_INFO, TRUE, FALSE, argc, argv, FALSE);
367 crm_info("Starting up");
368 attributes = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free_attribute);
369
370 attrd_exit_status = attrd_cluster_connect();
371 if (attrd_exit_status != pcmk_ok) {
372 goto done;
373 }
374 crm_info("Cluster connection active");
375
376 attrd_exit_status = attrd_cib_connect(10);
377 if (attrd_exit_status != pcmk_ok) {
378 goto done;
379 }
380 crm_info("CIB connection active");
381
382 writer = election_init(T_ATTRD, attrd_cluster->uname, 120000, attrd_election_cb);
383 attrd_init_ipc(&ipcs, attrd_ipc_dispatch);
384 crm_info("Accepting attribute updates");
385
386 attrd_run_mainloop();
387
388 done:
389 crm_info("Shutting down attribute manager");
390
391 election_fini(writer);
392 if (ipcs) {
393 crm_client_disconnect_all(ipcs);
394 qb_ipcs_destroy(ipcs);
395 g_hash_table_destroy(attributes);
396 }
397
398 attrd_lrmd_disconnect();
399 attrd_cib_disconnect();
400
401 return crm_exit(attrd_exit_status);
402 }