This source file includes following definitions.
- get_process_list
- pcmk_get_handler_ver0
- register_this_component
- plugin_has_quorum
- update_expected_votes
- process_ais_conf
- pcmk_config_init
- pcmk_wait_dispatch
- pcmk_update_nodeid
- build_path
- pcmk_startup
- totempg_ifaces_print
- ais_mark_unseen_peer_dead
- pcmk_peer_update
- pcmk_ipc_exit
- pcmk_ipc_connect
- pcmk_cluster_swab
- pcmk_cluster_callback
- pcmk_cluster_id_swab
- pcmk_cluster_id_callback
- send_ipc_ack
- pcmk_ipc
- pcmk_shutdown
- member_vote_count_fn
- member_loop_fn
- pcmk_generate_membership_data
- pcmk_nodes
- pcmk_remove_member
- send_quorum_details
- pcmk_quorum
- pcmk_notify
- pcmk_nodeid
- ghash_send_update
- send_member_notification
- check_message_sanity
- deliver_transient_msg
- route_ais_message
- send_plugin_msg_raw
- send_cluster_id
- ghash_send_removal
- ais_remove_peer
- ais_remove_peer_by_name
- process_ais_message
- member_dump_fn
- pcmk_exec_dump
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include <crm_internal.h>
20 #include <crm/cluster/internal.h>
21 #include <sys/types.h>
22 #include <sys/uio.h>
23 #include <sys/socket.h>
24 #include <sys/un.h>
25 #include <netinet/in.h>
26 #include <arpa/inet.h>
27 #include <unistd.h>
28 #include <fcntl.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <errno.h>
32 #include <signal.h>
33 #include <string.h>
34
35 #include <corosync/totem/totempg.h>
36 #include <corosync/engine/objdb.h>
37 #include <corosync/engine/config.h>
38
39 #include <config.h>
40 #include "plugin.h"
41 #include "utils.h"
42
43 #include <glib.h>
44
45 #include <sys/resource.h>
46 #include <sys/utsname.h>
47 #include <sys/socket.h>
48 #include <sys/wait.h>
49 #include <sys/stat.h>
50 #include <pthread.h>
51 #include <bzlib.h>
52 #include <pwd.h>
53
54 struct corosync_api_v1 *pcmk_api = NULL;
55
56 uint32_t plugin_has_votes = 0;
57 uint32_t plugin_expected_votes = 2;
58
59 int use_mgmtd = 0;
60 int plugin_log_level = LOG_DEBUG;
61 char *local_uname = NULL;
62 int local_uname_len = 0;
63 char *local_cname = NULL;
64 int local_cname_len = 0;
65 uint32_t local_nodeid = 0;
66 char *ipc_channel_name = NULL;
67 static uint64_t local_born_on = 0;
68
69 uint64_t membership_seq = 0;
70 pthread_t pcmk_wait_thread;
71
72 gboolean use_mcp = FALSE;
73 gboolean wait_active = TRUE;
74 gboolean have_reliable_membership_id = FALSE;
75 GHashTable *ipc_client_list = NULL;
76 GHashTable *membership_list = NULL;
77 GHashTable *membership_notify_list = NULL;
78
79 #define MAX_RESPAWN 100
80 #define LOOPBACK_ID 16777343
81 #define crm_flag_none 0x00000000
82 #define crm_flag_members 0x00000001
83
84 struct crm_identify_msg_s {
85 cs_ipc_header_request_t header __attribute__ ((aligned(8)));
86 uint32_t id;
87 uint32_t pid;
88 int32_t votes;
89 uint32_t processes;
90 char uname[256];
91 char version[256];
92 uint64_t born_on;
93 } __attribute__ ((packed));
94
95
96 static crm_child_t pcmk_children[] = {
97 { 0, crm_proc_none, crm_flag_none, 0, 0, FALSE, "none", NULL, NULL, NULL, NULL },
98 { 0, crm_proc_plugin, crm_flag_none, 0, 0, FALSE, "ais", NULL, NULL, NULL, NULL },
99 { 0, crm_proc_lrmd, crm_flag_none, 3, 0, TRUE, "lrmd", NULL, CRM_DAEMON_DIR"/lrmd", NULL, NULL },
100 { 0, crm_proc_cib, crm_flag_members, 1, 0, TRUE, "cib", CRM_DAEMON_USER, CRM_DAEMON_DIR"/cib", NULL, NULL },
101 { 0, crm_proc_crmd, crm_flag_members, 6, 0, TRUE, "crmd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/crmd", NULL, NULL },
102 { 0, crm_proc_attrd, crm_flag_none, 4, 0, TRUE, "attrd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/attrd", NULL, NULL },
103 { 0, crm_proc_stonithd, crm_flag_none, 0, 0, TRUE, "stonithd", NULL, "/bin/false", NULL, NULL },
104 { 0, crm_proc_pe, crm_flag_none, 5, 0, TRUE, "pengine", CRM_DAEMON_USER, CRM_DAEMON_DIR"/pengine", NULL, NULL },
105 { 0, crm_proc_mgmtd, crm_flag_none, 7, 0, TRUE, "mgmtd", NULL, HB_DAEMON_DIR"/mgmtd", NULL, NULL },
106 { 0, crm_proc_stonith_ng, crm_flag_members, 2, 0, TRUE, "stonith-ng", NULL, CRM_DAEMON_DIR"/stonithd", NULL, NULL },
107 };
108
109
110 void send_cluster_id(void);
111 int send_plugin_msg_raw(const AIS_Message * ais_msg);
112 char *pcmk_generate_membership_data(void);
113 gboolean check_message_sanity(const AIS_Message * msg, const char *data);
114
115 typedef const void ais_void_ptr;
116 int pcmk_shutdown(void);
117 void pcmk_peer_update(enum totem_configuration_type configuration_type,
118 const unsigned int *member_list, size_t member_list_entries,
119 const unsigned int *left_list, size_t left_list_entries,
120 const unsigned int *joined_list, size_t joined_list_entries,
121 const struct memb_ring_id *ring_id);
122
123 int pcmk_startup(struct corosync_api_v1 *corosync_api);
124 int pcmk_config_init(struct corosync_api_v1 *corosync_api);
125
126 int pcmk_ipc_exit(void *conn);
127 int pcmk_ipc_connect(void *conn);
128 void pcmk_ipc(void *conn, ais_void_ptr * msg);
129
130 void pcmk_exec_dump(void);
131 void pcmk_cluster_swab(void *msg);
132 void pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid);
133
134 void pcmk_nodeid(void *conn, ais_void_ptr * msg);
135 void pcmk_nodes(void *conn, ais_void_ptr * msg);
136 void pcmk_notify(void *conn, ais_void_ptr * msg);
137 void pcmk_remove_member(void *conn, ais_void_ptr * msg);
138 void pcmk_quorum(void *conn, ais_void_ptr * msg);
139
140 void pcmk_cluster_id_swab(void *msg);
141 void pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid);
142 void ais_remove_peer(char *node_id);
143 void ais_remove_peer_by_name(const char *node_name);
144
145 static uint32_t
146 get_process_list(void)
147 {
148 int lpc = 0;
149 uint32_t procs = crm_proc_plugin;
150
151 if (use_mcp) {
152 return 0;
153 }
154
155 for (lpc = 0; lpc < SIZEOF(pcmk_children); lpc++) {
156 if (pcmk_children[lpc].pid != 0) {
157 procs |= pcmk_children[lpc].flag;
158 }
159 }
160 return procs;
161 }
162
163 static struct corosync_lib_handler pcmk_lib_service[] = {
164 {
165 .lib_handler_fn = pcmk_ipc,
166 .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
167 },
168 {
169 .lib_handler_fn = pcmk_nodes,
170 .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
171 },
172 {
173 .lib_handler_fn = pcmk_notify,
174 .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
175 },
176 {
177 .lib_handler_fn = pcmk_nodeid,
178 .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
179 },
180 {
181 .lib_handler_fn = pcmk_remove_member,
182 .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
183 },
184 {
185 .lib_handler_fn = pcmk_quorum,
186 .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
187 },
188 };
189
190 static struct corosync_exec_handler pcmk_exec_service[] = {
191 {
192 .exec_handler_fn = pcmk_cluster_callback,
193 .exec_endian_convert_fn = pcmk_cluster_swab},
194 {
195 .exec_handler_fn = pcmk_cluster_id_callback,
196 .exec_endian_convert_fn = pcmk_cluster_id_swab}
197 };
198
199
200
201
202
203 struct corosync_service_engine pcmk_service_handler = {
204 .name = (char *)"Pacemaker Cluster Manager "PACEMAKER_VERSION,
205 .id = PCMK_SERVICE_ID,
206 .private_data_size = 0,
207 .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
208 .allow_inquorate = CS_LIB_ALLOW_INQUORATE,
209 .lib_init_fn = pcmk_ipc_connect,
210 .lib_exit_fn = pcmk_ipc_exit,
211 .exec_init_fn = pcmk_startup,
212 .exec_exit_fn = pcmk_shutdown,
213 .config_init_fn = pcmk_config_init,
214 .priority = 50,
215 .lib_engine = pcmk_lib_service,
216 .lib_engine_count = sizeof (pcmk_lib_service) / sizeof (struct corosync_lib_handler),
217 .exec_engine = pcmk_exec_service,
218 .exec_engine_count = sizeof (pcmk_exec_service) / sizeof (struct corosync_exec_handler),
219 .confchg_fn = pcmk_peer_update,
220 .exec_dump_fn = pcmk_exec_dump,
221
222
223
224
225 };
226
227
228
229
230
231 struct corosync_service_engine *pcmk_get_handler_ver0 (void);
232
233 struct corosync_service_engine_iface_ver0 pcmk_service_handler_iface = {
234 .corosync_get_service_engine_ver0 = pcmk_get_handler_ver0
235 };
236
237 static struct lcr_iface openais_pcmk_ver0[2] = {
238 {
239 .name = "pacemaker",
240 .version = 0,
241 .versions_replace = 0,
242 .versions_replace_count = 0,
243 .dependencies = 0,
244 .dependency_count = 0,
245 .constructor = NULL,
246 .destructor = NULL,
247 .interfaces = NULL
248 },
249 {
250 .name = "pacemaker",
251 .version = 1,
252 .versions_replace = 0,
253 .versions_replace_count = 0,
254 .dependencies = 0,
255 .dependency_count = 0,
256 .constructor = NULL,
257 .destructor = NULL,
258 .interfaces = NULL
259 }
260 };
261
262 static struct lcr_comp pcmk_comp_ver0 = {
263 .iface_count = 2,
264 .ifaces = openais_pcmk_ver0
265 };
266
267
268 struct corosync_service_engine *
269 pcmk_get_handler_ver0(void)
270 {
271 return (&pcmk_service_handler);
272 }
273
274 __attribute__ ((constructor))
275 static void
276 register_this_component(void)
277 {
278 lcr_interfaces_set(&openais_pcmk_ver0[0], &pcmk_service_handler_iface);
279 lcr_interfaces_set(&openais_pcmk_ver0[1], &pcmk_service_handler_iface);
280
281 lcr_component_register(&pcmk_comp_ver0);
282 }
283
284 static int
285 plugin_has_quorum(void)
286 {
287 if ((plugin_expected_votes >> 1) < plugin_has_votes) {
288 return 1;
289 }
290 return 0;
291 }
292
293 static void
294 update_expected_votes(int value)
295 {
296 if (value < plugin_has_votes) {
297
298 ais_info("Cannot update expected quorum votes %d -> %d:"
299 " value cannot be less that the current number of votes",
300 plugin_expected_votes, value);
301
302 } else if (plugin_expected_votes != value) {
303 ais_info("Expected quorum votes %d -> %d", plugin_expected_votes, value);
304 plugin_expected_votes = value;
305 }
306 }
307
308
309 static void
310 process_ais_conf(void)
311 {
312 char *value = NULL;
313 gboolean any_log = FALSE;
314 hdb_handle_t top_handle = 0;
315 hdb_handle_t local_handle = 0;
316
317 ais_info("Reading configure");
318 top_handle = config_find_init(pcmk_api, "logging");
319 local_handle = config_find_next(pcmk_api, "logging", top_handle);
320
321 get_config_opt(pcmk_api, local_handle, "debug", &value, "on");
322 if (ais_get_boolean(value)) {
323 plugin_log_level = LOG_DEBUG;
324 pcmk_env.debug = "1";
325
326 } else {
327 plugin_log_level = LOG_INFO;
328 pcmk_env.debug = "0";
329 }
330
331 get_config_opt(pcmk_api, local_handle, "to_logfile", &value, "off");
332 if (ais_get_boolean(value)) {
333 get_config_opt(pcmk_api, local_handle, "logfile", &value, NULL);
334
335 if (value == NULL) {
336 ais_err("Logging to a file requested but no log file specified");
337
338 } else {
339 uid_t pcmk_uid = geteuid();
340 uid_t pcmk_gid = getegid();
341
342 FILE *logfile = fopen(value, "a");
343
344 if (logfile) {
345 int ignore = 0;
346 int logfd = fileno(logfile);
347
348 pcmk_env.logfile = value;
349
350
351 ignore = fchown(logfd, pcmk_uid, pcmk_gid);
352 ignore = fchmod(logfd, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
353
354 if (ignore < 0) {
355 fprintf(logfile, "Could not set r/w permissions for uid=%d, gid=%d on %s\n",
356 pcmk_uid, pcmk_gid, value);
357
358 } else {
359 fprintf(logfile, "Set r/w permissions for uid=%d, gid=%d on %s\n",
360 pcmk_uid, pcmk_gid, value);
361 }
362 fflush(logfile);
363 fsync(logfd);
364 fclose(logfile);
365 any_log = TRUE;
366
367 } else {
368 ais_err("Couldn't create logfile: %s", value);
369 }
370 }
371 }
372
373 get_config_opt(pcmk_api, local_handle, "to_syslog", &value, "on");
374 if (any_log && ais_get_boolean(value) == FALSE) {
375 ais_info("User configured file based logging and explicitly disabled syslog.");
376 value = "none";
377
378 } else {
379 if (ais_get_boolean(value) == FALSE) {
380 ais_err
381 ("Please enable some sort of logging, either 'to_file: on' or 'to_syslog: on'.");
382 ais_err("If you use file logging, be sure to also define a value for 'logfile'");
383 }
384 get_config_opt(pcmk_api, local_handle, "syslog_facility", &value, "daemon");
385 }
386 pcmk_env.syslog = value;
387
388 config_find_done(pcmk_api, local_handle);
389
390 top_handle = config_find_init(pcmk_api, "quorum");
391 local_handle = config_find_next(pcmk_api, "quorum", top_handle);
392 get_config_opt(pcmk_api, local_handle, "provider", &value, NULL);
393 if (value && ais_str_eq("quorum_cman", value)) {
394 pcmk_env.quorum = "cman";
395 } else {
396 pcmk_env.quorum = "pcmk";
397 }
398
399 top_handle = config_find_init(pcmk_api, "service");
400 local_handle = config_find_next(pcmk_api, "service", top_handle);
401 while (local_handle) {
402 value = NULL;
403 pcmk_api->object_key_get(local_handle, "name", strlen("name"), (void **)&value, NULL);
404 if (ais_str_eq("pacemaker", value)) {
405 break;
406 }
407 local_handle = config_find_next(pcmk_api, "service", top_handle);
408 }
409
410 get_config_opt(pcmk_api, local_handle, "ver", &value, "0");
411 if (ais_str_eq(value, "1")) {
412 ais_info("Enabling MCP mode: Use the Pacemaker init script to complete Pacemaker startup");
413 use_mcp = TRUE;
414 }
415
416 get_config_opt(pcmk_api, local_handle, "clustername", &local_cname, "pcmk");
417 local_cname_len = strlen(local_cname);
418
419 get_config_opt(pcmk_api, local_handle, "use_logd", &value, "no");
420 pcmk_env.use_logd = value;
421
422 get_config_opt(pcmk_api, local_handle, "use_mgmtd", &value, "no");
423 if (ais_get_boolean(value) == FALSE) {
424 int lpc = 0;
425
426 for (; lpc < SIZEOF(pcmk_children); lpc++) {
427 if (crm_proc_mgmtd & pcmk_children[lpc].flag) {
428
429 pcmk_children[lpc].start_seq = 0;
430 break;
431 }
432 }
433 }
434
435 config_find_done(pcmk_api, local_handle);
436 }
437
438 int
439 pcmk_config_init(struct corosync_api_v1 *unused)
440 {
441 return 0;
442 }
443
444 static void *
445 pcmk_wait_dispatch(void *arg)
446 {
447 struct timespec waitsleep = {
448 .tv_sec = 1,
449 .tv_nsec = 0
450 };
451
452 while (wait_active) {
453 int lpc = 0;
454
455 for (; lpc < SIZEOF(pcmk_children); lpc++) {
456 if (pcmk_children[lpc].pid > 0) {
457 int status;
458 pid_t pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL);
459
460 if (pid == 0) {
461 continue;
462
463 } else if (pid < 0) {
464 ais_perror("Call to wait4(%s) failed", pcmk_children[lpc].name);
465 continue;
466 }
467
468
469 pcmk_children[lpc].pid = 0;
470 pcmk_children[lpc].conn = NULL;
471 pcmk_children[lpc].async_conn = NULL;
472
473 if (WIFSIGNALED(status)) {
474 int sig = WTERMSIG(status);
475
476 ais_err("Child process %s terminated with signal %d"
477 " (pid=%d, core=%s)",
478 pcmk_children[lpc].name, sig, pid,
479 WCOREDUMP(status) ? "true" : "false");
480
481 } else if (WIFEXITED(status)) {
482 int rc = WEXITSTATUS(status);
483
484 do_ais_log(rc == 0 ? LOG_NOTICE : LOG_ERR,
485 "Child process %s exited (pid=%d, rc=%d)", pcmk_children[lpc].name,
486 pid, rc);
487
488 if (rc == 100) {
489 ais_notice("Child process %s no longer wishes"
490 " to be respawned", pcmk_children[lpc].name);
491 pcmk_children[lpc].respawn = FALSE;
492 }
493 }
494
495
496
497
498
499
500
501
502
503 send_cluster_id();
504
505 pcmk_children[lpc].respawn_count += 1;
506 if (pcmk_children[lpc].respawn_count > MAX_RESPAWN) {
507 ais_err("Child respawn count exceeded by %s", pcmk_children[lpc].name);
508 pcmk_children[lpc].respawn = FALSE;
509 }
510 if (pcmk_children[lpc].respawn) {
511 ais_notice("Respawning failed child process: %s", pcmk_children[lpc].name);
512 spawn_child(&(pcmk_children[lpc]));
513 }
514 send_cluster_id();
515 }
516 }
517 sched_yield();
518 nanosleep(&waitsleep, 0);
519 }
520 return 0;
521 }
522
523 static uint32_t
524 pcmk_update_nodeid(void)
525 {
526 int last = local_nodeid;
527
528 local_nodeid = pcmk_api->totem_nodeid_get();
529
530 if (last != local_nodeid) {
531 if (last == 0) {
532 ais_info("Local node id: %u", local_nodeid);
533
534 } else {
535 char *last_s = NULL;
536
537 ais_malloc0(last_s, 32);
538 ais_warn("Detected local node id change: %u -> %u", last, local_nodeid);
539 snprintf(last_s, 31, "%u", last);
540 ais_remove_peer(last_s);
541 ais_free(last_s);
542 }
543 update_member(local_nodeid, 0, 0, 1, 0, local_uname, CRM_NODE_MEMBER, NULL);
544 }
545
546 return local_nodeid;
547 }
548
549 static void
550 build_path(const char *path_c, mode_t mode)
551 {
552 int offset = 1, len = 0;
553 char *path = ais_strdup(path_c);
554
555 AIS_CHECK(path != NULL, return);
556 for (len = strlen(path); offset < len; offset++) {
557 if (path[offset] == '/') {
558 path[offset] = 0;
559 if (mkdir(path, mode) < 0 && errno != EEXIST) {
560 ais_perror("Could not create directory '%s'", path);
561 break;
562 }
563 path[offset] = '/';
564 }
565 }
566 if (mkdir(path, mode) < 0 && errno != EEXIST) {
567 ais_perror("Could not create directory '%s'", path);
568 }
569 ais_free(path);
570 }
571
572 int
573 pcmk_startup(struct corosync_api_v1 *init_with)
574 {
575 int rc = 0;
576 int lpc = 0;
577 int start_seq = 1;
578 struct utsname us;
579 struct rlimit cores;
580 static int max = SIZEOF(pcmk_children);
581
582 uid_t pcmk_uid = 0;
583 gid_t pcmk_gid = 0;
584
585 uid_t root_uid = -1;
586 uid_t cs_uid = geteuid();
587
588 pcmk_user_lookup("root", &root_uid, NULL);
589
590 pcmk_api = init_with;
591
592 pcmk_env.debug = "0";
593 pcmk_env.logfile = NULL;
594 pcmk_env.use_logd = "false";
595 pcmk_env.syslog = "daemon";
596
597 if (cs_uid != root_uid) {
598 ais_err("Corosync must be configured to start as 'root',"
599 " otherwise Pacemaker cannot manage services."
600 " Expected %d got %d", root_uid, cs_uid);
601 return -1;
602 }
603
604 process_ais_conf();
605
606 membership_list = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, destroy_ais_node);
607 membership_notify_list = g_hash_table_new(g_direct_hash, g_direct_equal);
608 ipc_client_list = g_hash_table_new(g_direct_hash, g_direct_equal);
609
610 ais_info("CRM: Initialized");
611 log_printf(LOG_INFO, "Logging: Initialized %s\n", __FUNCTION__);
612
613 rc = getrlimit(RLIMIT_CORE, &cores);
614 if (rc < 0) {
615 ais_perror("Cannot determine current maximum core size.");
616 } else {
617 if (cores.rlim_max == 0 && geteuid() == 0) {
618 cores.rlim_max = RLIM_INFINITY;
619 } else {
620 ais_info("Maximum core file size is: %lu", cores.rlim_max);
621 }
622 cores.rlim_cur = cores.rlim_max;
623
624 rc = setrlimit(RLIMIT_CORE, &cores);
625 if (rc < 0) {
626 ais_perror("Core file generation will remain disabled."
627 " Core files are an important diagnositic tool,"
628 " please consider enabling them by default.");
629 }
630 #if 0
631
632
633
634 if (system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) {
635 ais_perror("Could not enable /proc/sys/kernel/core_uses_pid");
636 }
637 #endif
638 }
639
640 if (pcmk_user_lookup(CRM_DAEMON_USER, &pcmk_uid, &pcmk_gid) < 0) {
641 ais_err("Cluster user %s does not exist, aborting Pacemaker startup", CRM_DAEMON_USER);
642 return TRUE;
643 }
644
645 rc = mkdir(CRM_STATE_DIR, 0750);
646 rc = chown(CRM_STATE_DIR, pcmk_uid, pcmk_gid);
647
648
649 build_path(HA_STATE_DIR "/heartbeat", 0755);
650
651
652 build_path(CRM_RSCTMP_DIR, 0755);
653
654 rc = uname(&us);
655 AIS_ASSERT(rc == 0);
656 local_uname = ais_strdup(us.nodename);
657 local_uname_len = strlen(local_uname);
658
659 ais_info("Service: %d", PCMK_SERVICE_ID);
660 ais_info("Local hostname: %s", local_uname);
661 pcmk_update_nodeid();
662
663 if (use_mcp == FALSE) {
664 pthread_create(&pcmk_wait_thread, NULL, pcmk_wait_dispatch, NULL);
665 for (start_seq = 1; start_seq < max; start_seq++) {
666
667 for (lpc = 0; lpc < max; lpc++) {
668 if (start_seq == pcmk_children[lpc].start_seq) {
669 spawn_child(&(pcmk_children[lpc]));
670 }
671 }
672 }
673 }
674 return 0;
675 }
676
677 #if 0
678
679 char *
680 totempg_ifaces_print(unsigned int nodeid)
681 {
682 static char iface_string[256 * INTERFACE_MAX];
683 char one_iface[64];
684 struct totem_ip_address interfaces[INTERFACE_MAX];
685 char **status;
686 unsigned int iface_count;
687 unsigned int i;
688 int res;
689
690 iface_string[0] = '\0';
691
692 res = totempg_ifaces_get(nodeid, interfaces, &status, &iface_count);
693 if (res == -1) {
694 return ("no interface found for nodeid");
695 }
696
697 for (i = 0; i < iface_count; i++) {
698 sprintf(one_iface, "r(%d) ip(%s), ", i, totemip_print(&interfaces[i]));
699 strcat(iface_string, one_iface);
700 }
701 return (iface_string);
702 }
703 #endif
704
705 static void
706 ais_mark_unseen_peer_dead(gpointer key, gpointer value, gpointer user_data)
707 {
708 int *changed = user_data;
709 crm_node_t *node = value;
710
711 if (node->last_seen != membership_seq && ais_str_eq(CRM_NODE_LOST, node->state) == FALSE) {
712 ais_info("Node %s was not seen in the previous transition", node->uname);
713 *changed += update_member(node->id, 0, membership_seq, node->votes,
714 node->processes, node->uname, CRM_NODE_LOST, NULL);
715 }
716 }
717
718 void
719 pcmk_peer_update(enum totem_configuration_type configuration_type,
720 const unsigned int *member_list, size_t member_list_entries,
721 const unsigned int *left_list, size_t left_list_entries,
722 const unsigned int *joined_list, size_t joined_list_entries,
723 const struct memb_ring_id *ring_id)
724 {
725 int lpc = 0;
726 int changed = 0;
727 int do_update = 0;
728
729 AIS_ASSERT(ring_id != NULL);
730 switch (configuration_type) {
731 case TOTEM_CONFIGURATION_REGULAR:
732 do_update = 1;
733 break;
734 case TOTEM_CONFIGURATION_TRANSITIONAL:
735 break;
736 }
737
738 membership_seq = ring_id->seq;
739 ais_notice("%s membership event on ring %lld: memb=%ld, new=%ld, lost=%ld",
740 do_update ? "Stable" : "Transitional", ring_id->seq,
741 (long)member_list_entries, (long)joined_list_entries, (long)left_list_entries);
742
743 if (do_update == 0) {
744 for (lpc = 0; lpc < joined_list_entries; lpc++) {
745 const char *prefix = "new: ";
746 uint32_t nodeid = joined_list[lpc];
747
748 ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
749 }
750 for (lpc = 0; lpc < member_list_entries; lpc++) {
751 const char *prefix = "memb:";
752 uint32_t nodeid = member_list[lpc];
753
754 ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
755 }
756 for (lpc = 0; lpc < left_list_entries; lpc++) {
757 const char *prefix = "lost:";
758 uint32_t nodeid = left_list[lpc];
759
760 ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
761 }
762 return;
763 }
764
765 for (lpc = 0; lpc < joined_list_entries; lpc++) {
766 const char *prefix = "NEW: ";
767 uint32_t nodeid = joined_list[lpc];
768 crm_node_t *node = NULL;
769
770 changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
771
772 ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
773
774 node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(nodeid));
775 if (node->addr == NULL) {
776 const char *addr = totempg_ifaces_print(nodeid);
777
778 node->addr = ais_strdup(addr);
779 ais_debug("Node %u has address %s", nodeid, node->addr);
780 }
781 }
782
783 for (lpc = 0; lpc < member_list_entries; lpc++) {
784 const char *prefix = "MEMB:";
785 uint32_t nodeid = member_list[lpc];
786
787 changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
788
789 ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
790 }
791
792 for (lpc = 0; lpc < left_list_entries; lpc++) {
793 const char *prefix = "LOST:";
794 uint32_t nodeid = left_list[lpc];
795
796 changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_LOST, NULL);
797 ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
798 }
799
800 if (changed && joined_list_entries == 0 && left_list_entries == 0) {
801 ais_err("Something strange happened: %d", changed);
802 changed = 0;
803 }
804
805 ais_trace("Reaping unseen nodes...");
806 g_hash_table_foreach(membership_list, ais_mark_unseen_peer_dead, &changed);
807
808 if (member_list_entries > 1) {
809
810
811
812
813 have_reliable_membership_id = TRUE;
814 }
815
816 if (changed) {
817 ais_debug("%d nodes changed", changed);
818 pcmk_update_nodeid();
819 send_member_notification();
820 }
821
822 send_cluster_id();
823 }
824
825 int
826 pcmk_ipc_exit(void *conn)
827 {
828 int lpc = 0;
829 const char *client = NULL;
830 void *async_conn = conn;
831
832 for (; lpc < SIZEOF(pcmk_children); lpc++) {
833 if (pcmk_children[lpc].conn == conn) {
834 if (wait_active == FALSE) {
835
836 pcmk_children[lpc].pid = 0;
837 }
838 pcmk_children[lpc].conn = NULL;
839 pcmk_children[lpc].async_conn = NULL;
840 client = pcmk_children[lpc].name;
841 break;
842 }
843 }
844
845 g_hash_table_remove(membership_notify_list, async_conn);
846 g_hash_table_remove(ipc_client_list, async_conn);
847
848 if (client) {
849 do_ais_log(LOG_INFO, "Client %s (conn=%p, async-conn=%p) left", client, conn, async_conn);
850 } else {
851 do_ais_log((LOG_DEBUG + 1), "Client %s (conn=%p, async-conn=%p) left",
852 "unknown-transient", conn, async_conn);
853 }
854
855 return (0);
856 }
857
858 int
859 pcmk_ipc_connect(void *conn)
860 {
861
862
863
864 return (0);
865 }
866
867
868
869
870 void
871 pcmk_cluster_swab(void *msg)
872 {
873 AIS_Message *ais_msg = msg;
874
875 ais_trace("Performing endian conversion...");
876 ais_msg->id = swab32(ais_msg->id);
877 ais_msg->size = swab32(ais_msg->size);
878 ais_msg->is_compressed = swab32(ais_msg->is_compressed);
879 ais_msg->compressed_size = swab32(ais_msg->compressed_size);
880
881 ais_msg->host.id = swab32(ais_msg->host.id);
882 ais_msg->host.pid = swab32(ais_msg->host.pid);
883 ais_msg->host.type = swab32(ais_msg->host.type);
884 ais_msg->host.size = swab32(ais_msg->host.size);
885 ais_msg->host.local = swab32(ais_msg->host.local);
886
887 ais_msg->sender.id = swab32(ais_msg->sender.id);
888 ais_msg->sender.pid = swab32(ais_msg->sender.pid);
889 ais_msg->sender.type = swab32(ais_msg->sender.type);
890 ais_msg->sender.size = swab32(ais_msg->sender.size);
891 ais_msg->sender.local = swab32(ais_msg->sender.local);
892
893 ais_msg->header.size = swab32(ais_msg->header.size);
894 ais_msg->header.id = swab32(ais_msg->header.id);
895 ais_msg->header.error = swab32(ais_msg->header.error);
896 }
897
898 void
899 pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid)
900 {
901 const AIS_Message *ais_msg = message;
902
903 ais_trace("Message from node %u (%s)", nodeid, nodeid == local_nodeid ? "local" : "remote");
904
905
906
907
908
909 if (ais_msg->host.size == 0 || ais_str_eq(ais_msg->host.uname, local_uname)) {
910 route_ais_message(ais_msg, FALSE);
911
912 } else {
913 ais_trace("Discarding Msg[%d] (dest=%s:%s, from=%s:%s)",
914 ais_msg->id, ais_dest(&(ais_msg->host)),
915 msg_type2text(ais_msg->host.type),
916 ais_dest(&(ais_msg->sender)), msg_type2text(ais_msg->sender.type));
917 }
918 }
919
920 void
921 pcmk_cluster_id_swab(void *msg)
922 {
923 struct crm_identify_msg_s *ais_msg = msg;
924
925 ais_trace("Performing endian conversion...");
926 ais_msg->id = swab32(ais_msg->id);
927 ais_msg->pid = swab32(ais_msg->pid);
928 ais_msg->votes = swab32(ais_msg->votes);
929 ais_msg->processes = swab32(ais_msg->processes);
930 ais_msg->born_on = swab64(ais_msg->born_on);
931
932 ais_msg->header.size = swab32(ais_msg->header.size);
933 ais_msg->header.id = swab32(ais_msg->header.id);
934 }
935
936 void
937 pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid)
938 {
939 int changed = 0;
940 const struct crm_identify_msg_s *msg = message;
941
942 if (nodeid != msg->id) {
943 ais_err("Invalid message: Node %u claimed to be node %d", nodeid, msg->id);
944 return;
945 }
946 ais_debug("Node update: %s (%s)", msg->uname, msg->version);
947 changed =
948 update_member(nodeid, msg->born_on, membership_seq, msg->votes, msg->processes, msg->uname,
949 NULL, msg->version);
950
951 if (changed) {
952 send_member_notification();
953 }
954 }
955
956 struct res_overlay {
957 cs_ipc_header_response_t header __attribute((aligned(8)));
958 char buf[4096];
959 };
960
961 struct res_overlay *res_overlay = NULL;
962
963 static void
964 send_ipc_ack(void *conn)
965 {
966 if (res_overlay == NULL) {
967 ais_malloc0(res_overlay, sizeof(struct res_overlay));
968 }
969
970 res_overlay->header.id = CRM_MESSAGE_IPC_ACK;
971 res_overlay->header.size = sizeof(cs_ipc_header_response_t);
972 res_overlay->header.error = CS_OK;
973 pcmk_api->ipc_response_send(conn, res_overlay, res_overlay->header.size);
974 }
975
976
977 void
978 pcmk_ipc(void *conn, ais_void_ptr * msg)
979 {
980 AIS_Message *mutable;
981 int type = 0;
982 gboolean transient = TRUE;
983 const AIS_Message *ais_msg = (const AIS_Message *)msg;
984 void *async_conn = conn;
985
986 ais_trace("Message from client %p", conn);
987
988 if (check_message_sanity(msg, ((const AIS_Message *)msg)->data) == FALSE) {
989
990 send_ipc_ack(conn);
991 msg = NULL;
992 return;
993 }
994
995
996
997
998
999
1000 mutable = ais_msg_copy(ais_msg);
1001 AIS_ASSERT(check_message_sanity(mutable, mutable->data));
1002
1003 type = mutable->sender.type;
1004 ais_trace
1005 ("type: %d local: %d conn: %p host type: %d ais: %d sender pid: %d child pid: %d size: %d",
1006 type, mutable->host.local, pcmk_children[type].conn, mutable->host.type, crm_msg_ais,
1007 mutable->sender.pid, pcmk_children[type].pid, ((int)SIZEOF(pcmk_children)));
1008
1009 if (type > crm_msg_none && type < SIZEOF(pcmk_children)) {
1010
1011 transient = FALSE;
1012 }
1013 #if 0
1014
1015
1016
1017 AIS_CHECK(transient || mutable->sender.pid == pcmk_children[type].pid,
1018 ais_err("Sender: %d, child[%d]: %d", mutable->sender.pid, type,
1019 pcmk_children[type].pid);
1020 ais_free(mutable);
1021 return);
1022 #endif
1023
1024 if (transient == FALSE
1025 && type > crm_msg_none
1026 && mutable->host.local
1027 && pcmk_children[type].conn == NULL && mutable->host.type == crm_msg_ais) {
1028 AIS_CHECK(mutable->sender.type != mutable->sender.pid,
1029 ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type));
1030
1031 ais_info("Recorded connection %p for %s/%d",
1032 conn, pcmk_children[type].name, pcmk_children[type].pid);
1033 pcmk_children[type].conn = conn;
1034 pcmk_children[type].async_conn = async_conn;
1035
1036
1037 if (pcmk_children[type].flags & crm_flag_members) {
1038 char *update = pcmk_generate_membership_data();
1039
1040 g_hash_table_replace(membership_notify_list, async_conn, async_conn);
1041 ais_info("Sending membership update " U64T " to %s",
1042 membership_seq, pcmk_children[type].name);
1043 send_client_msg(async_conn, crm_class_members, crm_msg_none, update);
1044 }
1045
1046 } else if (transient) {
1047 AIS_CHECK(mutable->sender.type == mutable->sender.pid,
1048 ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type));
1049 g_hash_table_replace(ipc_client_list, async_conn, GUINT_TO_POINTER(mutable->sender.pid));
1050 }
1051
1052 mutable->sender.id = local_nodeid;
1053 mutable->sender.size = local_uname_len;
1054 memset(mutable->sender.uname, 0, MAX_NAME);
1055 memcpy(mutable->sender.uname, local_uname, mutable->sender.size);
1056
1057 route_ais_message(mutable, TRUE);
1058 send_ipc_ack(conn);
1059 msg = NULL;
1060 ais_free(mutable);
1061 }
1062
1063 int
1064 pcmk_shutdown(void)
1065 {
1066 int lpc = 0;
1067 static int phase = 0;
1068 static int max_wait = 0;
1069 static time_t next_log = 0;
1070 static int max = SIZEOF(pcmk_children);
1071
1072 if (use_mcp) {
1073 if (pcmk_children[crm_msg_crmd].conn || pcmk_children[crm_msg_stonith_ng].conn) {
1074 time_t now = time(NULL);
1075
1076 if (now > next_log) {
1077 next_log = now + 300;
1078 ais_notice
1079 ("Preventing Corosync shutdown. Please ensure Pacemaker is stopped first.");
1080 }
1081 return -1;
1082 }
1083 ais_notice("Unloading Pacemaker plugin");
1084 return 0;
1085 }
1086
1087 if (phase == 0) {
1088 ais_notice("Shutting down Pacemaker");
1089 phase = max;
1090 }
1091
1092 wait_active = FALSE;
1093
1094 for (; phase > 0; phase--) {
1095
1096
1097 for (lpc = max - 1; lpc >= 0; lpc--) {
1098 if (phase != pcmk_children[lpc].start_seq) {
1099 continue;
1100 }
1101
1102 if (pcmk_children[lpc].pid) {
1103 pid_t pid = 0;
1104 int status = 0;
1105 time_t now = time(NULL);
1106
1107 if (pcmk_children[lpc].respawn) {
1108 max_wait = 5;
1109 next_log = now + 30;
1110 pcmk_children[lpc].respawn = FALSE;
1111 stop_child(&(pcmk_children[lpc]), SIGTERM);
1112 }
1113
1114 pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL);
1115 if (pid < 0) {
1116 ais_perror("Call to wait4(%s/%d) failed - treating it as stopped",
1117 pcmk_children[lpc].name, pcmk_children[lpc].pid);
1118
1119 } else if (pid == 0) {
1120 if (now >= next_log) {
1121 max_wait--;
1122 next_log = now + 30;
1123 ais_notice("Still waiting for %s (pid=%d, seq=%d) to terminate...",
1124 pcmk_children[lpc].name, pcmk_children[lpc].pid,
1125 pcmk_children[lpc].start_seq);
1126 if (max_wait <= 0 && phase < pcmk_children[crm_msg_crmd].start_seq) {
1127 ais_err("Child %s taking too long to terminate, sending SIGKILL",
1128 pcmk_children[lpc].name);
1129 stop_child(&(pcmk_children[lpc]), SIGKILL);
1130 }
1131 }
1132
1133 return -1;
1134 }
1135 }
1136
1137
1138 ais_notice("%s confirmed stopped", pcmk_children[lpc].name);
1139 pcmk_children[lpc].async_conn = NULL;
1140 pcmk_children[lpc].conn = NULL;
1141 pcmk_children[lpc].pid = 0;
1142 }
1143 }
1144
1145 send_cluster_id();
1146 ais_notice("Shutdown complete");
1147
1148
1149 return 0;
1150 }
1151
1152 struct member_loop_data {
1153 char *string;
1154 };
1155
1156 static void
1157 member_vote_count_fn(gpointer key, gpointer value, gpointer user_data)
1158 {
1159 crm_node_t *node = value;
1160
1161 if (ais_str_eq(CRM_NODE_MEMBER, node->state)) {
1162 plugin_has_votes += node->votes;
1163 }
1164 }
1165
1166 void
1167 member_loop_fn(gpointer key, gpointer value, gpointer user_data)
1168 {
1169 crm_node_t *node = value;
1170 struct member_loop_data *data = user_data;
1171
1172 ais_trace("Dumping node %u", node->id);
1173 data->string = append_member(data->string, node);
1174 }
1175
1176 char *
1177 pcmk_generate_membership_data(void)
1178 {
1179 int size = 0;
1180 struct member_loop_data data;
1181
1182 size = 256;
1183 ais_malloc0(data.string, size);
1184
1185
1186 update_member(local_nodeid, 0, 0, -1, get_process_list(), local_uname, CRM_NODE_MEMBER, NULL);
1187
1188 plugin_has_votes = 0;
1189 g_hash_table_foreach(membership_list, member_vote_count_fn, NULL);
1190 if (plugin_has_votes > plugin_expected_votes) {
1191 update_expected_votes(plugin_has_votes);
1192 }
1193
1194 snprintf(data.string, size,
1195 "<nodes id=\"" U64T "\" quorate=\"%s\" expected=\"%u\" actual=\"%u\">",
1196 membership_seq, plugin_has_quorum()? "true" : "false",
1197 plugin_expected_votes, plugin_has_votes);
1198
1199 g_hash_table_foreach(membership_list, member_loop_fn, &data);
1200 size = strlen(data.string);
1201 data.string = realloc_safe(data.string, size + 9);
1202 sprintf(data.string + size, "</nodes>");
1203 return data.string;
1204 }
1205
1206 void
1207 pcmk_nodes(void *conn, ais_void_ptr * msg)
1208 {
1209 char *data = pcmk_generate_membership_data();
1210 void *async_conn = conn;
1211
1212
1213
1214
1215 send_ipc_ack(conn);
1216 msg = NULL;
1217
1218 if (async_conn) {
1219 send_client_msg(async_conn, crm_class_members, crm_msg_none, data);
1220 }
1221 ais_free(data);
1222 }
1223
1224 void
1225 pcmk_remove_member(void *conn, ais_void_ptr * msg)
1226 {
1227 const AIS_Message *ais_msg = msg;
1228 char *data = get_ais_data(ais_msg);
1229
1230 send_ipc_ack(conn);
1231 msg = NULL;
1232
1233 if (data != NULL) {
1234 char *bcast = ais_concat("remove-peer", data, ':');
1235
1236 send_plugin_msg(crm_msg_ais, NULL, bcast);
1237 ais_info("Sent: %s", bcast);
1238 ais_free(bcast);
1239 }
1240
1241 ais_free(data);
1242 }
1243
1244 static void
1245 send_quorum_details(void *conn)
1246 {
1247 int size = 256;
1248 char *data = NULL;
1249
1250 ais_malloc0(data, size);
1251
1252 snprintf(data, size, "<quorum id=\"" U64T "\" quorate=\"%s\" expected=\"%u\" actual=\"%u\"/>",
1253 membership_seq, plugin_has_quorum()? "true" : "false",
1254 plugin_expected_votes, plugin_has_votes);
1255
1256 send_client_msg(conn, crm_class_quorum, crm_msg_none, data);
1257 ais_free(data);
1258 }
1259
1260 void
1261 pcmk_quorum(void *conn, ais_void_ptr * msg)
1262 {
1263 char *dummy = NULL;
1264 const AIS_Message *ais_msg = msg;
1265 char *data = get_ais_data(ais_msg);
1266
1267 send_ipc_ack(conn);
1268 msg = NULL;
1269
1270
1271 dummy = pcmk_generate_membership_data();
1272 ais_free(dummy);
1273
1274
1275 if (data != NULL && strlen(data) > 0) {
1276 int value = ais_get_int(data, NULL);
1277
1278 update_expected_votes(value);
1279 }
1280
1281 send_quorum_details(conn);
1282 ais_free(data);
1283 }
1284
1285 void
1286 pcmk_notify(void *conn, ais_void_ptr * msg)
1287 {
1288 const AIS_Message *ais_msg = msg;
1289 char *data = get_ais_data(ais_msg);
1290 void *async_conn = conn;
1291
1292 int enable = 0;
1293 int sender = ais_msg->sender.pid;
1294
1295 send_ipc_ack(conn);
1296 msg = NULL;
1297
1298 if (ais_str_eq("true", data)) {
1299 enable = 1;
1300 }
1301
1302 ais_info("%s node notifications for child %d (%p)",
1303 enable ? "Enabling" : "Disabling", sender, async_conn);
1304 if (enable) {
1305 g_hash_table_replace(membership_notify_list, async_conn, async_conn);
1306 } else {
1307 g_hash_table_remove(membership_notify_list, async_conn);
1308 }
1309 ais_free(data);
1310 }
1311
1312 void
1313 pcmk_nodeid(void *conn, ais_void_ptr * msg)
1314 {
1315 static int counter = 0;
1316 struct crm_ais_nodeid_resp_s resp;
1317
1318 ais_trace("Sending local nodeid: %d to %p[%d]", local_nodeid, conn, counter);
1319
1320 resp.header.id = crm_class_nodeid;
1321 resp.header.size = sizeof(struct crm_ais_nodeid_resp_s);
1322 resp.header.error = CS_OK;
1323 resp.id = local_nodeid;
1324 resp.counter = counter++;
1325 memset(resp.uname, 0, MAX_NAME);
1326 memcpy(resp.uname, local_uname, local_uname_len);
1327 memset(resp.cname, 0, MAX_NAME);
1328 memcpy(resp.cname, local_cname, local_cname_len);
1329
1330 pcmk_api->ipc_response_send(conn, &resp, resp.header.size);
1331 }
1332
1333 static gboolean
1334 ghash_send_update(gpointer key, gpointer value, gpointer data)
1335 {
1336 if (send_client_msg(value, crm_class_members, crm_msg_none, data) != 0) {
1337
1338 return TRUE;
1339 }
1340 return FALSE;
1341 }
1342
1343 void
1344 send_member_notification(void)
1345 {
1346 char *update = pcmk_generate_membership_data();
1347
1348 ais_info("Sending membership update " U64T " to %d children",
1349 membership_seq, g_hash_table_size(membership_notify_list));
1350
1351 g_hash_table_foreach_remove(membership_notify_list, ghash_send_update, update);
1352 ais_free(update);
1353 }
1354
1355 gboolean
1356 check_message_sanity(const AIS_Message * msg, const char *data)
1357 {
1358 gboolean sane = TRUE;
1359 gboolean repaired = FALSE;
1360 int dest = msg->host.type;
1361 int tmp_size = msg->header.size - sizeof(AIS_Message);
1362
1363 if (sane && msg->header.size == 0) {
1364 ais_err("Message with no size");
1365 sane = FALSE;
1366 }
1367
1368 if (sane && msg->header.error != CS_OK) {
1369 ais_err("Message header contains an error: %d", msg->header.error);
1370 sane = FALSE;
1371 }
1372
1373 AIS_CHECK(msg->header.size > sizeof(AIS_Message),
1374 ais_err("Message %d size too small: %d < %llu",
1375 msg->header.id, msg->header.size,
1376 (unsigned long long) sizeof(AIS_Message));
1377 return FALSE);
1378
1379 if (sane && ais_data_len(msg) != tmp_size) {
1380 ais_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg),
1381 tmp_size);
1382 sane = TRUE;
1383 }
1384
1385 if (sane && ais_data_len(msg) == 0) {
1386 ais_err("Message with no payload");
1387 sane = FALSE;
1388 }
1389
1390 if (sane && data && msg->is_compressed == FALSE) {
1391 int str_size = strlen(data) + 1;
1392
1393 if (ais_data_len(msg) != str_size) {
1394 int lpc = 0;
1395
1396 ais_err("Message payload is corrupted: expected %d bytes, got %d",
1397 ais_data_len(msg), str_size);
1398 sane = FALSE;
1399 for (lpc = (str_size - 10); lpc < msg->size; lpc++) {
1400 if (lpc < 0) {
1401 lpc = 0;
1402 }
1403 ais_trace("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
1404 }
1405 }
1406 }
1407
1408 if (sane == FALSE) {
1409 AIS_CHECK(sane,
1410 ais_err
1411 ("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
1412 msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
1413 msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
1414 ais_data_len(msg), msg->header.size));
1415
1416 } else if (repaired) {
1417 ais_err
1418 ("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
1419 msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
1420 msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
1421 ais_data_len(msg), msg->header.size);
1422 } else {
1423 ais_trace
1424 ("Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
1425 msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
1426 msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
1427 ais_data_len(msg), msg->header.size);
1428 }
1429 return sane;
1430 }
1431
1432 static int delivered_transient = 0;
1433 static void
1434 deliver_transient_msg(gpointer key, gpointer value, gpointer user_data)
1435 {
1436 int pid = GPOINTER_TO_INT(value);
1437 AIS_Message *mutable = user_data;
1438
1439 if (pid == mutable->host.type) {
1440 int rc = send_client_ipc(key, mutable);
1441
1442 delivered_transient++;
1443
1444 ais_info("Sent message to %s.%d (rc=%d)", ais_dest(&(mutable->host)), pid, rc);
1445 if (rc != 0) {
1446 ais_warn("Sending message to %s.%d failed (rc=%d)",
1447 ais_dest(&(mutable->host)), pid, rc);
1448 log_ais_message(LOG_DEBUG, mutable);
1449 }
1450 }
1451 }
1452
1453 gboolean
1454 route_ais_message(const AIS_Message * msg, gboolean local_origin)
1455 {
1456 int rc = 0;
1457 int dest = msg->host.type;
1458 const char *reason = "unknown";
1459 AIS_Message *mutable = ais_msg_copy(msg);
1460 static int service_id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0);
1461
1462 ais_trace("Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d)",
1463 mutable->id, ais_dest(&(mutable->host)), msg_type2text(dest),
1464 ais_dest(&(mutable->sender)), msg_type2text(mutable->sender.type),
1465 mutable->sender.pid, local_origin ? "false" : "true", ais_data_len((mutable)));
1466
1467 if (local_origin == FALSE) {
1468 if (mutable->host.size == 0 || ais_str_eq(local_uname, mutable->host.uname)) {
1469 mutable->host.local = TRUE;
1470 }
1471 }
1472
1473 if (check_message_sanity(mutable, mutable->data) == FALSE) {
1474
1475 rc = 1;
1476 goto bail;
1477 }
1478
1479 if (mutable->host.local) {
1480 void *conn = NULL;
1481 const char *lookup = NULL;
1482 int children_index = 0;
1483
1484 if (dest == crm_msg_ais) {
1485 process_ais_message(mutable);
1486 goto bail;
1487
1488 } else if (dest == crm_msg_lrmd) {
1489
1490 dest = crm_msg_crmd;
1491
1492 } else if (dest == crm_msg_te) {
1493
1494 dest = crm_msg_crmd;
1495
1496 } else if (dest >= SIZEOF(pcmk_children)) {
1497
1498
1499 delivered_transient = 0;
1500 g_hash_table_foreach(ipc_client_list, deliver_transient_msg, mutable);
1501 if (delivered_transient) {
1502 ais_trace("Sent message to %d transient clients: %d", delivered_transient, dest);
1503 goto bail;
1504
1505 } else {
1506
1507 ais_trace("Sending message to transient client %d via crmd", dest);
1508 dest = crm_msg_crmd;
1509 }
1510
1511 } else if (dest == 0) {
1512 ais_err("Invalid destination: %d", dest);
1513 log_ais_message(LOG_ERR, mutable);
1514 log_printf(LOG_ERR, "%s", get_ais_data(mutable));
1515 rc = 1;
1516 goto bail;
1517 }
1518
1519 lookup = msg_type2text(dest);
1520
1521 if (dest == crm_msg_pe && ais_str_eq(pcmk_children[7].name, lookup)) {
1522 children_index = 7;
1523
1524 } else {
1525 children_index = dest;
1526 }
1527
1528 conn = pcmk_children[children_index].async_conn;
1529
1530 if (mutable->header.id == service_id) {
1531 mutable->header.id = 0;
1532
1533 } else if (mutable->header.id != 0) {
1534 ais_err("reset header id back to zero from %d", mutable->header.id);
1535 mutable->header.id = 0;
1536 }
1537
1538 reason = "ipc delivery failed";
1539 rc = send_client_ipc(conn, mutable);
1540
1541 } else if (local_origin) {
1542
1543 ais_trace("Forwarding to cluster");
1544 reason = "cluster delivery failed";
1545 rc = send_plugin_msg_raw(mutable);
1546 }
1547
1548 if (rc != 0) {
1549 ais_warn("Sending message to %s.%s failed: %s (rc=%d)",
1550 ais_dest(&(mutable->host)), msg_type2text(dest), reason, rc);
1551 log_ais_message(LOG_DEBUG, mutable);
1552 }
1553
1554 bail:
1555 ais_free(mutable);
1556 return rc == 0 ? TRUE : FALSE;
1557 }
1558
1559 int
1560 send_plugin_msg_raw(const AIS_Message * ais_msg)
1561 {
1562 int rc = 0;
1563 struct iovec iovec;
1564 static uint32_t msg_id = 0;
1565 AIS_Message *mutable = ais_msg_copy(ais_msg);
1566
1567 AIS_ASSERT(local_nodeid != 0);
1568 AIS_ASSERT(ais_msg->header.size == (sizeof(AIS_Message) + ais_data_len(ais_msg)));
1569
1570 if (mutable->id == 0) {
1571 msg_id++;
1572 AIS_CHECK(msg_id != 0 ,
1573 msg_id++; ais_err("Message ID wrapped around"));
1574 mutable->id = msg_id;
1575 }
1576
1577 mutable->header.error = CS_OK;
1578 mutable->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0);
1579
1580 mutable->sender.id = local_nodeid;
1581 mutable->sender.size = local_uname_len;
1582 memset(mutable->sender.uname, 0, MAX_NAME);
1583 memcpy(mutable->sender.uname, local_uname, mutable->sender.size);
1584
1585 iovec.iov_base = (char *)mutable;
1586 iovec.iov_len = mutable->header.size;
1587
1588 ais_trace("Sending message (size=%u)", (unsigned int)iovec.iov_len);
1589 rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE);
1590
1591 if (rc == 0 && mutable->is_compressed == FALSE) {
1592 ais_trace("Message sent: %.80s", mutable->data);
1593 }
1594
1595 AIS_CHECK(rc == 0, ais_err("Message not sent (%d): %.120s", rc, mutable->data));
1596
1597 ais_free(mutable);
1598 return rc;
1599 }
1600
1601 #define min(x,y) (x)<(y)?(x):(y)
1602
1603 void
1604 send_cluster_id(void)
1605 {
1606 int rc = 0;
1607 int len = 0;
1608 time_t now = time(NULL);
1609 struct iovec iovec;
1610 struct crm_identify_msg_s *msg = NULL;
1611
1612 static time_t started = 0;
1613 static uint64_t first_seq = 0;
1614
1615 AIS_ASSERT(local_nodeid != 0);
1616
1617 if (started == 0) {
1618 started = now;
1619 first_seq = membership_seq;
1620 }
1621
1622 if (local_born_on == 0) {
1623 if (started + 15 < now) {
1624 ais_debug("Born-on set to: " U64T " (age)", first_seq);
1625 local_born_on = first_seq;
1626
1627 } else if (have_reliable_membership_id) {
1628 ais_debug("Born-on set to: " U64T " (peer)", membership_seq);
1629 local_born_on = membership_seq;
1630
1631 } else {
1632 ais_debug("Leaving born-on unset: " U64T, membership_seq);
1633 }
1634 }
1635
1636 ais_malloc0(msg, sizeof(struct crm_identify_msg_s));
1637 msg->header.size = sizeof(struct crm_identify_msg_s);
1638
1639 msg->id = local_nodeid;
1640
1641 msg->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 1);
1642
1643 len = min(local_uname_len, MAX_NAME - 1);
1644 memset(msg->uname, 0, MAX_NAME);
1645 memcpy(msg->uname, local_uname, len);
1646
1647 len = min(strlen(VERSION), MAX_NAME - 1);
1648 memset(msg->version, 0, MAX_NAME);
1649 memcpy(msg->version, VERSION, len);
1650
1651 msg->votes = 1;
1652 msg->pid = getpid();
1653 msg->processes = get_process_list();
1654 msg->born_on = local_born_on;
1655
1656 ais_debug("Local update: id=%u, born=" U64T ", seq=" U64T "",
1657 local_nodeid, local_born_on, membership_seq);
1658 update_member(local_nodeid, local_born_on, membership_seq, msg->votes, msg->processes, NULL,
1659 NULL, VERSION);
1660
1661 iovec.iov_base = (char *)msg;
1662 iovec.iov_len = msg->header.size;
1663
1664 rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE);
1665
1666 AIS_CHECK(rc == 0, ais_err("Message not sent (%d)", rc));
1667
1668 ais_free(msg);
1669 }
1670
1671 static gboolean
1672 ghash_send_removal(gpointer key, gpointer value, gpointer data)
1673 {
1674 send_quorum_details(value);
1675 if (send_client_msg(value, crm_class_rmpeer, crm_msg_none, data) != 0) {
1676
1677 return TRUE;
1678 }
1679 return FALSE;
1680 }
1681
1682 void
1683 ais_remove_peer(char *node_id)
1684 {
1685 uint32_t id = ais_get_int(node_id, NULL);
1686 crm_node_t *node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id));
1687
1688 if (node == NULL) {
1689 ais_info("Peer %u is unknown", id);
1690
1691 } else if (ais_str_eq(CRM_NODE_MEMBER, node->state)) {
1692 ais_warn("Peer %u/%s is still active", id, node->uname);
1693
1694 } else if (g_hash_table_remove(membership_list, GUINT_TO_POINTER(id))) {
1695 plugin_expected_votes--;
1696 ais_notice("Removed dead peer %u from the membership list", id);
1697 ais_info("Sending removal of %u to %d children",
1698 id, g_hash_table_size(membership_notify_list));
1699
1700 g_hash_table_foreach_remove(membership_notify_list, ghash_send_removal, node_id);
1701
1702 } else {
1703 ais_warn("Peer %u/%s was not removed", id, node->uname);
1704 }
1705 }
1706
1707 void
1708 ais_remove_peer_by_name(const char *node_name)
1709 {
1710 GHashTableIter iter;
1711 gpointer key = 0;
1712 crm_node_t *node = NULL;
1713 GList *node_list = NULL;
1714
1715 g_hash_table_iter_init(&iter, membership_list);
1716
1717 while (g_hash_table_iter_next(&iter, &key, (void **)&node)) {
1718 if (ais_str_eq(node_name, node->uname)) {
1719 uint32_t node_id = GPOINTER_TO_UINT(key);
1720 char *node_id_s = NULL;
1721
1722 ais_malloc0(node_id_s, 32);
1723 snprintf(node_id_s, 31, "%u", node_id);
1724 node_list = g_list_append(node_list, node_id_s);
1725 }
1726 }
1727
1728 if (node_list) {
1729 GList *gIter = NULL;
1730
1731 for (gIter = node_list; gIter != NULL; gIter = gIter->next) {
1732 char *node_id_s = gIter->data;
1733
1734 ais_remove_peer(node_id_s);
1735 }
1736 g_list_free_full(node_list, free);
1737
1738 } else {
1739 ais_warn("Peer %s is unkown", node_name);
1740 }
1741 }
1742
1743 gboolean
1744 process_ais_message(const AIS_Message * msg)
1745 {
1746 int len = ais_data_len(msg);
1747 char *data = get_ais_data(msg);
1748
1749 do_ais_log(LOG_DEBUG,
1750 "Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s",
1751 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
1752 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
1753 msg->sender.pid,
1754 msg->sender.uname == local_uname ? "false" : "true", ais_data_len(msg), data);
1755
1756 if (data && len > 12 && strncmp("remove-peer:", data, 12) == 0) {
1757 char *node = data + 12;
1758
1759 ais_remove_peer_by_name(node);
1760 }
1761
1762 ais_free(data);
1763 return TRUE;
1764 }
1765
1766 static void
1767 member_dump_fn(gpointer key, gpointer value, gpointer user_data)
1768 {
1769 crm_node_t *node = value;
1770
1771 ais_info(" node id:%u, uname=%s state=%s processes=%.16x born=" U64T " seen=" U64T
1772 " addr=%s version=%s", node->id, node->uname ? node->uname : "-unknown-", node->state,
1773 node->processes, node->born, node->last_seen, node->addr ? node->addr : "-unknown-",
1774 node->version ? node->version : "-unknown-");
1775 }
1776
1777 void
1778 pcmk_exec_dump(void)
1779 {
1780
1781 process_ais_conf();
1782 ais_info("Local id: %u, uname: %s, born: " U64T, local_nodeid, local_uname, local_born_on);
1783 ais_info("Membership id: " U64T ", quorate: %s, expected: %u, actual: %u",
1784 membership_seq, plugin_has_quorum()? "true" : "false",
1785 plugin_expected_votes, plugin_has_votes);
1786
1787 g_hash_table_foreach(membership_list, member_dump_fn, NULL);
1788 }