24 #include <qb/qbarray.h> 26 struct mainloop_child_s {
48 struct mainloop_timer_s {
58 crm_trigger_prepare(GSource * source, gint *
timeout)
85 crm_trigger_check(GSource * source)
103 crm_trigger_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
105 gboolean rc = G_SOURCE_CONTINUE;
110 return G_SOURCE_CONTINUE;
112 trig->trigger = FALSE;
115 int callback_rc = callback(trig->user_data);
117 if (callback_rc < 0) {
118 crm_trace(
"Trigger handler %p not yet complete", trig);
119 trig->running = TRUE;
120 }
else if (callback_rc == 0) {
121 rc = G_SOURCE_REMOVE;
128 crm_trigger_finalize(GSource * source)
130 crm_trace(
"Trigger %p destroyed", source);
133 static GSourceFuncs crm_trigger_funcs = {
136 crm_trigger_dispatch,
137 crm_trigger_finalize,
141 mainloop_setup_trigger(GSource * source,
int priority,
int (*dispatch) (gpointer user_data),
149 trigger->trigger = FALSE;
150 trigger->user_data = userdata;
153 g_source_set_callback(source, dispatch, trigger, NULL);
156 g_source_set_priority(source, priority);
157 g_source_set_can_recurse(source, FALSE);
159 trigger->id = g_source_attach(source, NULL);
166 crm_trace(
"Trigger handler %p complete", trig);
167 trig->running = FALSE;
186 GSource *source = NULL;
189 source = g_source_new(&crm_trigger_funcs,
sizeof(
crm_trigger_t));
191 return mainloop_setup_trigger(source, priority, dispatch, userdata);
198 source->trigger = TRUE;
211 gs = (GSource *)source;
213 g_source_destroy(gs);
228 typedef struct signal_s {
230 void (*handler) (
int sig);
249 crm_signal_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
253 if(sig->signal != SIGCHLD) {
254 crm_notice(
"Caught '%s' signal " QB_XS
" %d (%s handler)",
255 strsignal(sig->signal), sig->signal,
256 (sig->handler?
"invoking" :
"no"));
259 sig->trigger.trigger = FALSE;
261 sig->handler(sig->signal);
276 mainloop_signal_handler(
int sig)
278 if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
284 static GSourceFuncs crm_signal_funcs = {
288 crm_trigger_finalize,
308 struct sigaction old;
310 if (sigemptyset(&mask) < 0) {
311 crm_err(
"Could not set handler for signal %d: %s",
316 memset(&sa, 0,
sizeof(
struct sigaction));
317 sa.sa_handler = dispatch;
318 sa.sa_flags = SA_RESTART;
321 if (sigaction(sig, &sa, &old) < 0) {
322 crm_err(
"Could not set handler for signal %d: %s",
326 return old.sa_handler;
330 mainloop_destroy_signal_entry(
int sig)
335 crm_signals[sig] = NULL;
336 crm_trace(
"Unregistering mainloop handler for signal %d", sig);
355 GSource *source = NULL;
356 int priority = G_PRIORITY_HIGH - 1;
358 if (sig == SIGTERM) {
366 if (sig >= NSIG || sig < 0) {
367 crm_err(
"Signal %d is out of range", sig);
370 }
else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
371 crm_trace(
"Signal handler for %d is already installed", sig);
374 }
else if (crm_signals[sig] != NULL) {
375 crm_err(
"Different signal handler for %d is already installed", sig);
380 source = g_source_new(&crm_signal_funcs,
sizeof(
crm_signal_t));
382 crm_signals[sig] = (
crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
385 crm_signals[sig]->handler = dispatch;
386 crm_signals[sig]->signal = sig;
389 mainloop_destroy_signal_entry(sig);
399 if (sig >= NSIG || sig < 0) {
400 crm_err(
"Signal %d is out of range", sig);
404 crm_perror(LOG_ERR,
"Could not uninstall signal handler for signal %d", sig);
407 }
else if (crm_signals[sig] == NULL) {
410 mainloop_destroy_signal_entry(sig);
414 static qb_array_t *gio_map = NULL;
419 if (gio_map != NULL) {
420 qb_array_free(gio_map);
424 for (
int sig = 0; sig < NSIG; ++sig) {
425 mainloop_destroy_signal_entry(sig);
432 struct gio_to_qb_poll {
437 qb_ipcs_dispatch_fn_t fn;
438 enum qb_loop_priority p;
442 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer
data)
444 struct gio_to_qb_poll *adaptor = (
struct gio_to_qb_poll *)
data;
445 gint fd = g_io_channel_unix_get_fd(gio);
453 return (adaptor->fn(fd, condition, adaptor->data) == 0);
457 gio_poll_destroy(gpointer
data)
459 struct gio_to_qb_poll *adaptor = (
struct gio_to_qb_poll *)
data;
464 if (adaptor->is_used == 0) {
465 crm_trace(
"Marking adaptor %p unused", adaptor);
479 conv_prio_libqb2glib(
enum qb_loop_priority prio)
482 case QB_LOOP_LOW:
return G_PRIORITY_LOW;
483 case QB_LOOP_HIGH:
return G_PRIORITY_HIGH;
484 default:
return G_PRIORITY_DEFAULT;
497 static enum qb_ipcs_rate_limit
498 conv_libqb_prio2ratelimit(
enum qb_loop_priority prio)
501 case QB_LOOP_LOW:
return QB_IPCS_RATE_SLOW;
502 case QB_LOOP_HIGH:
return QB_IPCS_RATE_FAST;
503 default:
return QB_IPCS_RATE_NORMAL;
508 gio_poll_dispatch_update(
enum qb_loop_priority p, int32_t fd, int32_t evts,
509 void *
data, qb_ipcs_dispatch_fn_t fn, int32_t add)
511 struct gio_to_qb_poll *adaptor;
515 res = qb_array_index(gio_map, fd, (
void **)&adaptor);
517 crm_err(
"Array lookup failed for fd=%d: %d", fd, res);
521 crm_trace(
"Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
523 if (add && adaptor->source) {
524 crm_err(
"Adaptor for descriptor %d is still in-use", fd);
527 if (!add && !adaptor->is_used) {
528 crm_err(
"Adaptor for descriptor %d is not in-use", fd);
533 channel = g_io_channel_unix_new(fd);
535 crm_err(
"No memory left to add fd=%d", fd);
539 if (adaptor->source) {
540 g_source_remove(adaptor->source);
545 evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
548 adaptor->events = evts;
549 adaptor->data =
data;
553 g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
554 gio_read_socket, adaptor, gio_poll_destroy);
565 g_io_channel_unref(channel);
567 crm_trace(
"Added to mainloop with gsource id=%d", adaptor->source);
568 if (adaptor->source > 0) {
576 gio_poll_dispatch_add(
enum qb_loop_priority p, int32_t fd, int32_t evts,
577 void *
data, qb_ipcs_dispatch_fn_t fn)
579 return gio_poll_dispatch_update(p, fd, evts,
data, fn, QB_TRUE);
583 gio_poll_dispatch_mod(
enum qb_loop_priority p, int32_t fd, int32_t evts,
584 void *
data, qb_ipcs_dispatch_fn_t fn)
586 return gio_poll_dispatch_update(p, fd, evts,
data, fn, QB_FALSE);
590 gio_poll_dispatch_del(int32_t fd)
592 struct gio_to_qb_poll *adaptor;
595 if (qb_array_index(gio_map, fd, (
void **)&adaptor) == 0) {
596 if (adaptor->source) {
597 g_source_remove(adaptor->source);
606 .dispatch_add = gio_poll_dispatch_add,
607 .dispatch_mod = gio_poll_dispatch_mod,
608 .dispatch_del = gio_poll_dispatch_del,
611 static enum qb_ipc_type
612 pick_ipc_type(
enum qb_ipc_type requested)
616 if (env && strcmp(
"shared-mem", env) == 0) {
618 }
else if (env && strcmp(
"socket", env) == 0) {
619 return QB_IPC_SOCKET;
620 }
else if (env && strcmp(
"posix", env) == 0) {
621 return QB_IPC_POSIX_MQ;
622 }
else if (env && strcmp(
"sysv", env) == 0) {
623 return QB_IPC_SYSV_MQ;
624 }
else if (requested == QB_IPC_NATIVE) {
638 struct qb_ipcs_service_handlers *callbacks)
645 struct qb_ipcs_service_handlers *callbacks,
646 enum qb_loop_priority prio)
649 qb_ipcs_service_t *server = NULL;
651 if (gio_map == NULL) {
652 gio_map = qb_array_create_2(64,
sizeof(
struct gio_to_qb_poll), 1);
655 server = qb_ipcs_create(
name, 0, pick_ipc_type(
type), callbacks);
657 if (server == NULL) {
658 crm_err(
"Could not create %s IPC server: %s (%d)",
663 if (prio != QB_LOOP_MED) {
664 qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
671 rc = qb_ipcs_run(server);
684 qb_ipcs_destroy(server);
688 struct mainloop_io_s {
697 int (*dispatch_fn_ipc) (
const char *buffer, ssize_t length, gpointer userdata);
698 int (*dispatch_fn_io) (gpointer userdata);
699 void (*destroy_fn) (gpointer userdata);
714 mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer
data)
716 gboolean rc = G_SOURCE_CONTINUE;
719 pcmk__assert(client->fd == g_io_channel_unix_get_fd(gio));
721 if (condition & G_IO_IN) {
729 crm_trace(
"Could not read IPC message from %s: %s (%ld)",
732 }
else if (client->dispatch_fn_ipc) {
735 crm_trace(
"New %ld-byte IPC message from %s " 736 "after I/O condition %d",
737 read_rc, client->name, (
int) condition);
738 if (client->dispatch_fn_ipc(buffer, read_rc, client->userdata) < 0) {
739 crm_trace(
"Connection to %s no longer required", client->name);
740 rc = G_SOURCE_REMOVE;
744 }
while ((rc == G_SOURCE_CONTINUE) && (read_rc > 0) && --max > 0);
747 crm_trace(
"New I/O event for %s after I/O condition %d",
748 client->name, (
int) condition);
749 if (client->dispatch_fn_io) {
750 if (client->dispatch_fn_io(client->userdata) < 0) {
751 crm_trace(
"Connection to %s no longer required", client->name);
752 rc = G_SOURCE_REMOVE;
759 crm_err(
"Connection to %s closed " QB_XS
" client=%p condition=%d",
760 client->name, client, condition);
761 rc = G_SOURCE_REMOVE;
763 }
else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
764 crm_trace(
"The connection %s[%p] has been closed (I/O condition=%d)",
765 client->name, client, condition);
766 rc = G_SOURCE_REMOVE;
768 }
else if ((condition & G_IO_IN) == 0) {
796 crm_err(
"Strange condition: %d", condition);
806 mainloop_gio_destroy(gpointer c)
809 char *c_name = strdup(client->name);
814 crm_trace(
"Destroying client %s[%p]", c_name, c);
820 if (client->destroy_fn) {
821 void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
823 client->destroy_fn = NULL;
824 destroy_fn(client->userdata);
834 crm_trace(
"Destroyed client %s[%p]", c_name, c);
836 free(client->name); client->name = NULL;
867 const char *ipc_name = NULL;
869 CRM_CHECK((ipc != NULL) && (callbacks != NULL),
return EINVAL);
880 crm_debug(
"Could not obtain file descriptor for %s IPC: %s",
887 if (*source == NULL) {
893 (*source)->ipc = ipc;
894 (*source)->destroy_fn = callbacks->
destroy;
895 (*source)->dispatch_fn_ipc = callbacks->
dispatch;
910 return timer->period_ms;
926 fprintf(stderr,
"Connection to %s failed: %s",
963 if (client == NULL) {
966 client->name = strdup(
name);
967 client->userdata = userdata;
970 client->destroy_fn = callbacks->
destroy;
971 client->dispatch_fn_io = callbacks->
dispatch;
975 client->channel = g_io_channel_unix_new(fd);
977 g_io_add_watch_full(client->channel, priority,
978 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
979 client, mainloop_gio_destroy);
990 g_io_channel_unref(client->channel);
991 crm_trace(
"Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
1002 if (client != NULL) {
1003 crm_trace(
"Removing client %s[%p]", client->name, client);
1004 if (client->source) {
1008 g_source_remove(client->source);
1013 static GList *child_list = NULL;
1030 return child->timeout;
1036 return child->privatedata;
1042 child->privatedata = NULL;
1049 if (child->timerid != 0) {
1050 crm_trace(
"Removing timer %d", child->timerid);
1051 g_source_remove(child->timerid);
1064 crm_debug(
"Kill pid %d only. leave group intact.", child->pid);
1065 rc = kill(child->pid, SIGKILL);
1067 crm_debug(
"Kill pid %d's group", child->pid);
1068 rc = kill(-child->pid, SIGKILL);
1072 if (errno != ESRCH) {
1073 crm_perror(LOG_ERR,
"kill(%d, KILL) failed", child->pid);
1081 child_timeout_callback(gpointer p)
1087 if (child->timeout) {
1088 crm_warn(
"%s process (PID %d) will not die!", child->desc, (
int)child->pid);
1092 rc = child_kill_helper(child);
1098 child->timeout = TRUE;
1099 crm_debug(
"%s process (PID %d) timed out", child->desc, (
int)child->pid);
1113 bool callback_needed =
true;
1115 rc = waitpid(child->pid, &status,
flags);
1117 crm_trace(
"Child process %d (%s) still active",
1118 child->pid, child->desc);
1119 callback_needed =
false;
1121 }
else if (rc != child->pid) {
1133 crm_notice(
"Wait for child process %d (%s) interrupted: %s",
1136 }
else if (WIFEXITED(status)) {
1137 exitcode = WEXITSTATUS(status);
1138 crm_trace(
"Child process %d (%s) exited with status %d",
1139 child->pid, child->desc, exitcode);
1141 }
else if (WIFSIGNALED(status)) {
1142 signo = WTERMSIG(status);
1143 crm_trace(
"Child process %d (%s) exited with signal %d (%s)",
1144 child->pid, child->desc, signo, strsignal(signo));
1146 #ifdef WCOREDUMP // AIX, SunOS, maybe others 1147 }
else if (WCOREDUMP(status)) {
1149 crm_err(
"Child process %d (%s) dumped core",
1150 child->pid, child->desc);
1154 crm_trace(
"Child process %d (%s) stopped or continued",
1155 child->pid, child->desc);
1156 callback_needed =
false;
1159 if (callback_needed && child->callback) {
1160 child->callback(child, child->pid, core, signo, exitcode);
1162 return callback_needed;
1166 child_death_dispatch(
int signal)
1168 for (GList *iter = child_list; iter; ) {
1169 GList *saved = iter;
1173 if (child_waitpid(child, WNOHANG)) {
1174 crm_trace(
"Removing completed process %d from child list",
1176 child_list = g_list_remove_link(child_list, saved);
1184 child_signal_init(gpointer p)
1191 child_death_dispatch(SIGCHLD);
1203 int waitflags = 0, rc = 0;
1205 for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1207 if (
pid == child->pid) {
1212 if (match == NULL) {
1216 rc = child_kill_helper(match);
1223 crm_trace(
"Waiting for signal that child process %d completed",
1227 }
else if(rc != 0) {
1231 waitflags = WNOHANG;
1234 if (!child_waitpid(match, waitflags)) {
1239 child_list = g_list_remove(child_list, match);
1255 static bool need_init = TRUE;
1260 child->timeout = FALSE;
1261 child->privatedata = privatedata;
1262 child->callback = callback;
1263 child->flags =
flags;
1270 child_list = g_list_append(child_list, child);
1290 mainloop_timer_cb(gpointer user_data)
1293 bool repeat = FALSE;
1294 struct mainloop_timer_s *t = user_data;
1304 crm_trace(
"Invoking callbacks for timer %s", t->name);
1306 if(t->cb(t->userdata) == FALSE) {
1307 crm_trace(
"Timer %s complete", t->name);
1323 if(t && t->id != 0) {
1333 if(t && t->period_ms > 0) {
1334 crm_trace(
"Starting timer %s", t->name);
1342 if(t && t->id != 0) {
1343 crm_trace(
"Stopping timer %s", t->name);
1344 g_source_remove(t->id);
1355 last = t->period_ms;
1356 t->period_ms = period_ms;
1359 if(t && t->id != 0 && last != t->period_ms) {
1376 t->period_ms = period_ms;
1379 t->userdata = userdata;
1380 crm_trace(
"Created timer %s with %p %p", t->name, userdata, t->userdata);
1388 crm_trace(
"Destroying timer %s", t->name);
1400 drain_timeout_cb(gpointer user_data)
1402 bool *timeout_popped = (
bool*) user_data;
1404 *timeout_popped = TRUE;
1417 if ((mloop != NULL) && g_main_loop_is_running(mloop)) {
1418 GMainContext *ctx = g_main_loop_get_context(mloop);
1423 for (
int i = 0; (i < n) && g_main_context_pending(ctx); ++i) {
1424 g_main_context_dispatch(ctx);
1426 g_main_loop_quit(mloop);
1446 bool timeout_popped = FALSE;
1448 GMainContext *ctx = NULL;
1452 ctx = g_main_loop_get_context(mloop);
1454 time_t start_time = time(NULL);
1457 while (!timeout_popped
1458 && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1459 g_main_context_iteration(ctx, TRUE);
1462 if (!timeout_popped && (timer > 0)) {
1463 g_source_remove(timer);
void * mainloop_child_userdata(mainloop_child_t *child)
#define CRM_CHECK(expr, failure_action)
pid_t mainloop_child_pid(mainloop_child_t *child)
bool mainloop_timer_running(mainloop_timer_t *t)
#define crm_notice(fmt, args...)
const char * pcmk_strerror(int rc)
struct signal_s crm_signal_t
void mainloop_del_fd(mainloop_io_t *client)
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
enum pcmk_ipc_server type
gboolean mainloop_destroy_signal(int sig)
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
Create a trigger to be used as a mainloop source.
void(* destroy)(gpointer userdata)
Destroy function for mainloop file descriptor client data.
void mainloop_trigger_complete(crm_trigger_t *trig)
const char * mainloop_child_name(mainloop_child_t *child)
void pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
Drain some remaining main loop events then quit it.
struct mainloop_timer_s mainloop_timer_t
struct mainloop_io_s mainloop_io_t
struct mainloop_child_s mainloop_child_t
long crm_ipc_read(crm_ipc_t *client)
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
struct qb_ipcs_poll_handlers gio_poll_funcs
const char * pcmk__env_option(const char *option)
guint pcmk__mainloop_timer_get_period(const mainloop_timer_t *timer)
Get period for mainloop timer.
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
Wrappers for and extensions to glib mainloop.
void mainloop_timer_del(mainloop_timer_t *t)
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
const char * crm_ipc_buffer(crm_ipc_t *client)
void mainloop_del_ipc_client(mainloop_io_t *client)
struct trigger_s crm_trigger_t
int pcmk__connect_generic_ipc(crm_ipc_t *ipc)
#define PCMK__ENV_IPC_TYPE
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
#define crm_warn(fmt, args...)
int mainloop_child_timeout(mainloop_child_t *child)
#define crm_debug(fmt, args...)
struct crm_ipc_s crm_ipc_t
void mainloop_set_trigger(crm_trigger_t *source)
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
void(* destroy)(gpointer userdata)
Destroy function for mainloop IPC connection client data.
#define crm_trace(fmt, args...)
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
void mainloop_timer_start(mainloop_timer_t *t)
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Wrappers for and extensions to libxml2.
void mainloop_clear_child_userdata(mainloop_child_t *child)
gboolean mainloop_child_kill(pid_t pid)
void mainloop_timer_stop(mainloop_timer_t *t)
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
unsigned int crm_ipc_default_buffer_size(void)
Return pacemaker's default IPC buffer size.
void crm_ipc_destroy(crm_ipc_t *client)
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
#define pcmk__str_copy(str)
bool crm_ipc_connected(crm_ipc_t *client)
#define pcmk__assert(expr)
unsigned int crm_log_level
void(* sighandler_t)(int)
const char * crm_ipc_name(crm_ipc_t *client)
#define crm_perror(level, fmt, args...)
Send a system error message to both the log and stderr.
qb_ipcs_service_t * mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks, enum qb_loop_priority prio)
Start server-side API end-point, hooked into the internal event loop.
#define crm_err(fmt, args...)
int pcmk__ipc_fd(crm_ipc_t *ipc, int *fd)
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
Create a new (legacy) object for using Pacemaker daemon IPC.
void mainloop_cleanup(void)
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
int pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata, const struct ipc_client_callbacks *callbacks, mainloop_io_t **source)
Connect to IPC and add it as a main loop source.
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
void pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool(*check)(guint))
Process main loop events while a certain condition is met.
#define pcmk__assert_alloc(nmemb, size)
sighandler_t crm_signal_handler(int sig, sighandler_t dispatch)
void crm_ipc_close(crm_ipc_t *client)
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
Dispatch function for an IPC connection used as mainloop source.
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1