36 #include <qb/qbarray.h> 
   38 struct mainloop_child_s {
 
   62 crm_trigger_prepare(GSource * source, gint * timeout)
 
   89 crm_trigger_check(GSource * source)
 
   97 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
 
  106     trig->trigger = FALSE;
 
  109         rc = callback(trig->user_data);
 
  111             crm_trace(
"Trigger handler %p not yet complete", trig);
 
  112             trig->running = TRUE;
 
  120 crm_trigger_finalize(GSource * source)
 
  122     crm_trace(
"Trigger %p destroyed", source);
 
  128   gpointer callback_data;
 
  129   GSourceCallbackFuncs *callback_funcs;
 
  131   const GSourceFuncs *source_funcs;
 
  134   GMainContext *context;
 
  151 g_source_refcount(GSource * source)
 
  155         struct _GSourceCopy *evil = (
struct _GSourceCopy*)source;
 
  156         return evil->ref_count;
 
  161 static int g_source_refcount(GSource * source)
 
  167 static GSourceFuncs crm_trigger_funcs = {
 
  170     crm_trigger_dispatch,
 
  171     crm_trigger_finalize,
 
  175 mainloop_setup_trigger(GSource * source, 
int priority, 
int (*dispatch) (gpointer user_data),
 
  183     trigger->trigger = FALSE;
 
  184     trigger->user_data = userdata;
 
  187         g_source_set_callback(source, dispatch, trigger, NULL);
 
  190     g_source_set_priority(source, priority);
 
  191     g_source_set_can_recurse(source, FALSE);
 
  193     crm_trace(
"Setup %p with ref-count=%u", source, g_source_refcount(source));
 
  194     trigger->id = g_source_attach(source, NULL);
 
  195     crm_trace(
"Attached %p with ref-count=%u", source, g_source_refcount(source));
 
  203     crm_trace(
"Trigger handler %p complete", trig);
 
  204     trig->running = FALSE;
 
  215     GSource *source = NULL;
 
  218     source = g_source_new(&crm_trigger_funcs, 
sizeof(
crm_trigger_t));
 
  221     return mainloop_setup_trigger(source, priority, dispatch, userdata);
 
  228         source->trigger = TRUE;
 
  241     gs = (GSource *)source;
 
  243     if(g_source_refcount(gs) > 2) {
 
  244         crm_info(
"Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
 
  247     g_source_destroy(gs); 
 
  259 typedef struct signal_s {
 
  261     void (*handler) (
int sig);
 
  269 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
 
  273     if(sig->signal != SIGCHLD) {
 
  275                    strsignal(sig->signal), sig->signal,
 
  276                    (sig->handler? 
"invoking" : 
"no"));
 
  279     sig->trigger.trigger = FALSE;
 
  281         sig->handler(sig->signal);
 
  287 mainloop_signal_handler(
int sig)
 
  289     if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
 
  294 static GSourceFuncs crm_signal_funcs = {
 
  298     crm_trigger_finalize,
 
  306     struct sigaction old;
 
  308     if (sigemptyset(&mask) < 0) {
 
  309         crm_perror(LOG_ERR, 
"Call to sigemptyset failed");
 
  313     memset(&sa, 0, 
sizeof(
struct sigaction));
 
  314     sa.sa_handler = dispatch;
 
  315     sa.sa_flags = SA_RESTART;
 
  318     if (sigaction(sig, &sa, &old) < 0) {
 
  319         crm_perror(LOG_ERR, 
"Could not install signal handler for signal %d", sig);
 
  329     GSource *source = NULL;
 
  330     int priority = G_PRIORITY_HIGH - 1;
 
  332     if (sig == SIGTERM) {
 
  340     if (sig >= NSIG || sig < 0) {
 
  341         crm_err(
"Signal %d is out of range", sig);
 
  344     } 
else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
 
  345         crm_trace(
"Signal handler for %d is already installed", sig);
 
  348     } 
else if (crm_signals[sig] != NULL) {
 
  349         crm_err(
"Different signal handler for %d is already installed", sig);
 
  354     source = g_source_new(&crm_signal_funcs, 
sizeof(
crm_signal_t));
 
  356     crm_signals[sig] = (
crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
 
  359     crm_signals[sig]->handler = dispatch;
 
  360     crm_signals[sig]->signal = sig;
 
  362     if (
crm_signal(sig, mainloop_signal_handler) == FALSE) {
 
  365         crm_signals[sig] = NULL;
 
  376     if (siginterrupt(sig, 1) < 0) {
 
  377         crm_perror(LOG_INFO, 
"Could not enable system call interruptions for signal %d", sig);
 
  389     if (sig >= NSIG || sig < 0) {
 
  390         crm_err(
"Signal %d is out of range", sig);
 
  394         crm_perror(LOG_ERR, 
"Could not uninstall signal handler for signal %d", sig);
 
  397     } 
else if (crm_signals[sig] == NULL) {
 
  402     tmp = crm_signals[sig];
 
  403     crm_signals[sig] = NULL;
 
  408 static qb_array_t *gio_map = NULL;
 
  414         qb_array_free(gio_map);
 
  421 struct gio_to_qb_poll {
 
  426     qb_ipcs_dispatch_fn_t fn;
 
  427     enum qb_loop_priority p;
 
  431 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer 
data)
 
  433     struct gio_to_qb_poll *adaptor = (
struct gio_to_qb_poll *)data;
 
  434     gint fd = g_io_channel_unix_get_fd(gio);
 
  436     crm_trace(
"%p.%d %d", data, fd, condition);
 
  442     return (adaptor->fn(fd, condition, adaptor->data) == 0);
 
  446 gio_poll_destroy(gpointer data)
 
  448     struct gio_to_qb_poll *adaptor = (
struct gio_to_qb_poll *)data;
 
  453     if (adaptor->is_used == 0) {
 
  454         crm_trace(
"Marking adaptor %p unused", adaptor);
 
  460 gio_poll_dispatch_update(
enum qb_loop_priority p, 
int32_t fd, 
int32_t evts,
 
  461                          void *data, qb_ipcs_dispatch_fn_t fn, 
int32_t add)
 
  463     struct gio_to_qb_poll *adaptor;
 
  467     res = qb_array_index(gio_map, fd, (
void **)&adaptor);
 
  469         crm_err(
"Array lookup failed for fd=%d: %d", fd, res);
 
  473     crm_trace(
"Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
 
  475     if (add && adaptor->source) {
 
  476         crm_err(
"Adaptor for descriptor %d is still in-use", fd);
 
  479     if (!add && !adaptor->is_used) {
 
  480         crm_err(
"Adaptor for descriptor %d is not in-use", fd);
 
  485     channel = g_io_channel_unix_new(fd);
 
  487         crm_err(
"No memory left to add fd=%d", fd);
 
  491     if (adaptor->source) {
 
  492         g_source_remove(adaptor->source);
 
  497     evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
 
  500     adaptor->events = evts;
 
  501     adaptor->data = 
data;
 
  505         g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor,
 
  517     g_io_channel_unref(channel);
 
  519     crm_trace(
"Added to mainloop with gsource id=%d", adaptor->source);
 
  520     if (adaptor->source > 0) {
 
  528 gio_poll_dispatch_add(
enum qb_loop_priority p, 
int32_t fd, 
int32_t evts,
 
  529                       void *data, qb_ipcs_dispatch_fn_t fn)
 
  531     return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
 
  535 gio_poll_dispatch_mod(
enum qb_loop_priority p, 
int32_t fd, 
int32_t evts,
 
  536                       void *data, qb_ipcs_dispatch_fn_t fn)
 
  538     return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
 
  542 gio_poll_dispatch_del(
int32_t fd)
 
  544     struct gio_to_qb_poll *adaptor;
 
  547     if (qb_array_index(gio_map, fd, (
void **)&adaptor) == 0) {
 
  548         if (adaptor->source) {
 
  549             g_source_remove(adaptor->source);
 
  558     .dispatch_add = gio_poll_dispatch_add,
 
  559     .dispatch_mod = gio_poll_dispatch_mod,
 
  560     .dispatch_del = gio_poll_dispatch_del,
 
  563 static enum qb_ipc_type
 
  564 pick_ipc_type(
enum qb_ipc_type requested)
 
  566     const char *env = getenv(
"PCMK_ipc_type");
 
  568     if (env && strcmp(
"shared-mem", env) == 0) {
 
  570     } 
else if (env && strcmp(
"socket", env) == 0) {
 
  571         return QB_IPC_SOCKET;
 
  572     } 
else if (env && strcmp(
"posix", env) == 0) {
 
  573         return QB_IPC_POSIX_MQ;
 
  574     } 
else if (env && strcmp(
"sysv", env) == 0) {
 
  575         return QB_IPC_SYSV_MQ;
 
  576     } 
else if (requested == QB_IPC_NATIVE) {
 
  590                         struct qb_ipcs_service_handlers * callbacks)
 
  593     qb_ipcs_service_t *server = NULL;
 
  595     if (gio_map == NULL) {
 
  596         gio_map = qb_array_create_2(64, 
sizeof(
struct gio_to_qb_poll), 1);
 
  600     server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
 
  602 #ifdef HAVE_IPCS_GET_BUFFER_SIZE 
  607     qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
 
  609     rc = qb_ipcs_run(server);
 
  622         qb_ipcs_destroy(server);
 
  626 struct mainloop_io_s {
 
  635     int (*dispatch_fn_ipc) (
const char *buffer, ssize_t length, gpointer userdata);
 
  636     int (*dispatch_fn_io) (gpointer userdata);
 
  637     void (*destroy_fn) (gpointer userdata);
 
  642 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
 
  644     gboolean keep = TRUE;
 
  647     CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
 
  649     if (condition & G_IO_IN) {
 
  657                     crm_trace(
"Message acquisition from %s[%p] failed: %s (%ld)",
 
  660                 } 
else if (client->dispatch_fn_ipc) {
 
  663                     crm_trace(
"New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
 
  664                     if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
 
  665                         crm_trace(
"Connection to %s no longer required", client->name);
 
  670             } 
while (keep && rc > 0 && --max > 0);
 
  673             crm_trace(
"New message from %s[%p] %u", client->name, client, condition);
 
  674             if (client->dispatch_fn_io) {
 
  675                 if (client->dispatch_fn_io(client->userdata) < 0) {
 
  676                     crm_trace(
"Connection to %s no longer required", client->name);
 
  684         crm_err(
"Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition);
 
  687     } 
else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
 
  688         crm_trace(
"The connection %s[%p] has been closed (I/O condition=%d)",
 
  689                   client->name, client, condition);
 
  692     } 
else if ((condition & G_IO_IN) == 0) {
 
  720         crm_err(
"Strange condition: %d", condition);
 
  730 mainloop_gio_destroy(gpointer c)
 
  733     char *c_name = strdup(client->name);
 
  738     crm_trace(
"Destroying client %s[%p]", c_name, c);
 
  744     if (client->destroy_fn) {
 
  745         void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
 
  747         client->destroy_fn = NULL;
 
  748         destroy_fn(client->userdata);
 
  758     crm_trace(
"Destroyed client %s[%p]", c_name, c);
 
  760     free(client->name); client->name = NULL;
 
  779     if (client == NULL) {
 
  789     client->destroy_fn = callbacks->
destroy;
 
  790     client->dispatch_fn_ipc = callbacks->
dispatch;
 
  817         if (client == NULL) {
 
  820         client->name = strdup(name);
 
  821         client->userdata = userdata;
 
  824             client->destroy_fn = callbacks->
destroy;
 
  825             client->dispatch_fn_io = callbacks->
dispatch;
 
  829         client->channel = g_io_channel_unix_new(fd);
 
  831             g_io_add_watch_full(client->channel, priority,
 
  832                                 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
 
  833                                 client, mainloop_gio_destroy);
 
  844         g_io_channel_unref(client->channel);
 
  845         crm_trace(
"Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
 
  856     if (client != NULL) {
 
  857         crm_trace(
"Removing client %s[%p]", client->name, client);
 
  858         if (client->source) {
 
  862             g_source_remove(client->source);
 
  884     return child->timeout;
 
  890     return child->privatedata;
 
  896     child->privatedata = NULL;
 
  903     if (child->timerid != 0) {
 
  904         crm_trace(
"Removing timer %d", child->timerid);
 
  905         g_source_remove(child->timerid);
 
  918         crm_debug(
"Kill pid %d only. leave group intact.", child->pid);
 
  919         rc = kill(child->pid, SIGKILL);
 
  921         crm_debug(
"Kill pid %d's group", child->pid);
 
  922         rc = kill(-child->pid, SIGKILL);
 
  926         if (errno != ESRCH) {
 
  927             crm_perror(LOG_ERR, 
"kill(%d, KILL) failed", child->pid);
 
  935 child_timeout_callback(gpointer p)
 
  941     if (child->timeout) {
 
  942         crm_crit(
"%s process (PID %d) will not die!", child->desc, (
int)child->pid);
 
  946     rc = child_kill_helper(child);
 
  952     child->timeout = TRUE;
 
  953     crm_warn(
"%s process (PID %d) timed out", child->desc, (
int)child->pid);
 
  955     child->timerid = g_timeout_add(5000, child_timeout_callback, child);
 
  968     rc = waitpid(child->pid, &status, flags);
 
  970         crm_perror(LOG_DEBUG, 
"wait(%d) = %d", child->pid, rc);
 
  973     } 
else if(rc != child->pid) {
 
  977         crm_perror(LOG_ERR, 
"Call to waitpid(%d) failed", child->pid);
 
  980         crm_trace(
"Managed process %d exited: %p", child->pid, child);
 
  982         if (WIFEXITED(status)) {
 
  983             exitcode = WEXITSTATUS(status);
 
  984             crm_trace(
"Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
 
  986         } 
else if (WIFSIGNALED(status)) {
 
  987             signo = WTERMSIG(status);
 
  988             crm_trace(
"Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
 
  991         if (WCOREDUMP(status)) {
 
  993             crm_err(
"Managed process %d (%s) dumped core", child->pid, child->desc);
 
  998     if (child->callback) {
 
  999         child->callback(child, child->pid, core, signo, exitcode);
 
 1005 child_death_dispatch(
int signal)
 
 1013         exited = child_waitpid(child, WNOHANG);
 
 1018         if (exited == FALSE) {
 
 1021         crm_trace(
"Removing process entry %p for %d", child, child->pid);
 
 1023         child_list = g_list_remove_link(child_list, saved);
 
 1030 child_signal_init(gpointer p)
 
 1037     child_death_dispatch(SIGCHLD);
 
 1049     int waitflags = 0, rc = 0;
 
 1051     for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
 
 1053         if (pid == child->pid) {
 
 1058     if (match == NULL) {
 
 1062     rc = child_kill_helper(match);
 
 1073         crm_trace(
"Waiting for child %d to be reaped by child_death_dispatch()", match->pid);
 
 1076     } 
else if(rc != 0) {
 
 1080         waitflags = WNOHANG;
 
 1083     if (child_waitpid(match, waitflags) == FALSE) {
 
 1088     child_list = g_list_remove(child_list, match);
 
 1098                    void (*callback) (
mainloop_child_t * p, pid_t pid, 
int core, 
int signo, 
int exitcode))
 
 1100     static bool need_init = TRUE;
 
 1105     child->timeout = FALSE;
 
 1106     child->privatedata = privatedata;
 
 1107     child->callback = callback;
 
 1108     child->flags = 
flags;
 
 1111         child->desc = strdup(desc);
 
 1115         child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
 
 1118     child_list = g_list_append(child_list, child);
 
 1126         g_timeout_add(1, child_signal_init, NULL);
 
 1132                    void (*callback) (
mainloop_child_t * p, pid_t pid, 
int core, 
int signo, 
int exitcode))
 
 1137 struct mainloop_timer_s {
 
 1148 static gboolean mainloop_timer_cb(gpointer user_data)
 
 1151     bool repeat = FALSE;
 
 1152     struct mainloop_timer_s *t = user_data;
 
 1162         crm_trace(
"Invoking callbacks for timer %s", t->name);
 
 1164         if(t->cb(t->userdata) == FALSE) {
 
 1165             crm_trace(
"Timer %s complete", t->name);
 
 1180     if(t && t->id != 0) {
 
 1189     if(t && t->period_ms > 0) {
 
 1190         crm_trace(
"Starting timer %s", t->name);
 
 1191         t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
 
 1197     if(t && t->id != 0) {
 
 1198         crm_trace(
"Stopping timer %s", t->name);
 
 1199         g_source_remove(t->id);
 
 1209         last = t->period_ms;
 
 1210         t->period_ms = period_ms;
 
 1213     if(t && t->id != 0 && last != t->period_ms) {
 
 1232         t->period_ms = period_ms;
 
 1235         t->userdata = userdata;
 
 1236         crm_trace(
"Created timer %s with %p %p", t->name, userdata, t->userdata);
 
 1245         crm_trace(
"Destroying timer %s", t->name);
 
bool crm_ipc_connect(crm_ipc_t *client)
Establish an IPC connection to a Pacemaker component. 
 
#define crm_notice(fmt, args...)
 
struct signal_s crm_signal_t
 
#define crm_crit(fmt, args...)
 
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
 
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
 
void mainloop_timer_start(mainloop_timer_t *t)
 
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
 
void mainloop_timer_del(mainloop_timer_t *t)
 
gboolean mainloop_child_kill(pid_t pid)
 
int crm_ipc_get_fd(crm_ipc_t *client)
 
const char * pcmk_strerror(int rc)
 
struct mainloop_timer_s mainloop_timer_t
 
struct mainloop_io_s mainloop_io_t
 
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *userdata, enum mainloop_child_flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
 
struct mainloop_child_s mainloop_child_t
 
void mainloop_set_trigger(crm_trigger_t *source)
 
void mainloop_cleanup(void)
 
int(* dispatch)(gpointer userdata)
 
pid_t mainloop_child_pid(mainloop_child_t *child)
 
long crm_ipc_read(crm_ipc_t *client)
 
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
 
struct qb_ipcs_poll_handlers gio_poll_funcs
 
void mainloop_timer_stop(mainloop_timer_t *t)
 
Wrappers for and extensions to glib mainloop. 
 
void crm_client_init(void)
 
const char * crm_ipc_buffer(crm_ipc_t *client)
 
struct trigger_s crm_trigger_t
 
void(* destroy)(gpointer)
 
#define crm_warn(fmt, args...)
 
#define crm_debug(fmt, args...)
 
void * mainloop_child_userdata(mainloop_child_t *child)
 
struct crm_ipc_s crm_ipc_t
 
void(* destroy)(gpointer userdata)
 
gboolean crm_signal(int sig, void(*dispatch)(int sig))
 
#define crm_trace(fmt, args...)
 
Wrappers for and extensions to libxml2. 
 
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
 
void mainloop_del_ipc_client(mainloop_io_t *client)
 
unsigned int crm_ipc_default_buffer_size(void)
 
void crm_ipc_destroy(crm_ipc_t *client)
 
bool mainloop_timer_running(mainloop_timer_t *t)
 
bool crm_ipc_connected(crm_ipc_t *client)
 
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *userdata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
 
#define crm_perror(level, fmt, args...)
Log a system error message. 
 
void mainloop_trigger_complete(crm_trigger_t *trig)
 
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
 
#define crm_err(fmt, args...)
 
struct mainloop_timer_s mainloop
 
void mainloop_clear_child_userdata(mainloop_child_t *child)
 
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
 
void mainloop_del_fd(mainloop_io_t *client)
 
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
 
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
 
gboolean mainloop_destroy_signal(int sig)
 
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
 
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
 
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
 
void crm_ipc_close(crm_ipc_t *client)
 
const char * mainloop_child_name(mainloop_child_t *child)
 
#define crm_info(fmt, args...)
 
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
 
enum crm_ais_msg_types type
 
int mainloop_child_timeout(mainloop_child_t *child)