pacemaker 3.0.1-16e74fc4da
Scalable High-Availability cluster resource manager
Loading...
Searching...
No Matches
mainloop.c
Go to the documentation of this file.
1/*
2 * Copyright 2004-2025 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10#include <crm_internal.h>
11
12#include <stdlib.h>
13#include <string.h>
14#include <signal.h>
15#include <errno.h>
16
17#include <sys/wait.h>
18
19#include <crm/crm.h>
20#include <crm/common/xml.h>
21#include <crm/common/mainloop.h>
23
24#include <qb/qbarray.h>
25
26struct mainloop_child_s {
27 pid_t pid;
28 char *desc;
29 unsigned timerid;
30 gboolean timeout;
31 void *privatedata;
32
33 enum mainloop_child_flags flags;
34
35 /* Called when a process dies */
36 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
37};
38
39struct trigger_s {
40 GSource source;
41 gboolean running;
42 gboolean trigger;
43 void *user_data;
44 guint id;
45
46};
47
48struct mainloop_timer_s {
49 guint id;
50 guint period_ms;
51 bool repeat;
52 char *name;
53 GSourceFunc cb;
54 void *userdata;
55};
56
57static gboolean
58crm_trigger_prepare(GSource * source, gint * timeout)
59{
60 crm_trigger_t *trig = (crm_trigger_t *) source;
61
62 /* cluster-glue's FD and IPC related sources make use of
63 * g_source_add_poll() but do not set a timeout in their prepare
64 * functions
65 *
66 * This means mainloop's poll() will block until an event for one
67 * of these sources occurs - any /other/ type of source, such as
68 * this one or g_idle_*, that doesn't use g_source_add_poll() is
69 * S-O-L and won't be processed until there is something fd-based
70 * happens.
71 *
72 * Luckily the timeout we can set here affects all sources and
73 * puts an upper limit on how long poll() can take.
74 *
75 * So unconditionally set a small-ish timeout, not too small that
76 * we're in constant motion, which will act as an upper bound on
77 * how long the signal handling might be delayed for.
78 */
79 *timeout = 500; /* Timeout in ms */
80
81 return trig->trigger;
82}
83
84static gboolean
85crm_trigger_check(GSource * source)
86{
87 crm_trigger_t *trig = (crm_trigger_t *) source;
88
89 return trig->trigger;
90}
91
102static gboolean
103crm_trigger_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
104{
105 gboolean rc = G_SOURCE_CONTINUE;
106 crm_trigger_t *trig = (crm_trigger_t *) source;
107
108 if (trig->running) {
109 /* Wait until the existing job is complete before starting the next one */
110 return G_SOURCE_CONTINUE;
111 }
112 trig->trigger = FALSE;
113
114 if (callback) {
115 int callback_rc = callback(trig->user_data);
116
117 if (callback_rc < 0) {
118 crm_trace("Trigger handler %p not yet complete", trig);
119 trig->running = TRUE;
120 } else if (callback_rc == 0) {
121 rc = G_SOURCE_REMOVE;
122 }
123 }
124 return rc;
125}
126
127static void
128crm_trigger_finalize(GSource * source)
129{
130 crm_trace("Trigger %p destroyed", source);
131}
132
133static GSourceFuncs crm_trigger_funcs = {
134 crm_trigger_prepare,
135 crm_trigger_check,
136 crm_trigger_dispatch,
137 crm_trigger_finalize,
138};
139
140static crm_trigger_t *
141mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
142 gpointer userdata)
143{
144 crm_trigger_t *trigger = NULL;
145
146 trigger = (crm_trigger_t *) source;
147
148 trigger->id = 0;
149 trigger->trigger = FALSE;
150 trigger->user_data = userdata;
151
152 if (dispatch) {
153 g_source_set_callback(source, dispatch, trigger, NULL);
154 }
155
156 g_source_set_priority(source, priority);
157 g_source_set_can_recurse(source, FALSE);
158
159 trigger->id = g_source_attach(source, NULL);
160 return trigger;
161}
162
163void
165{
166 crm_trace("Trigger handler %p complete", trig);
167 trig->running = FALSE;
168}
169
183mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data),
184 gpointer userdata)
185{
186 GSource *source = NULL;
187
188 pcmk__assert(sizeof(crm_trigger_t) > sizeof(GSource));
189 source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
190
191 return mainloop_setup_trigger(source, priority, dispatch, userdata);
192}
193
194void
196{
197 if(source) {
198 source->trigger = TRUE;
199 }
200}
201
202gboolean
204{
205 GSource *gs = NULL;
206
207 if(source == NULL) {
208 return TRUE;
209 }
210
211 gs = (GSource *)source;
212
213 g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
214 g_source_unref(gs); /* The caller no longer carries a reference to source
215 *
216 * At this point the source should be free'd,
217 * unless we're currently processing said
218 * source, in which case mainloop holds an
219 * additional reference and it will be free'd
220 * once our processing completes
221 */
222 return TRUE;
223}
224
225// Define a custom glib source for signal handling
226
227// Data structure for custom glib source
228typedef struct signal_s {
229 crm_trigger_t trigger; // trigger that invoked source (must be first)
230 void (*handler) (int sig); // signal handler
231 int signal; // signal that was received
233
234// Table to associate signal handlers with signal numbers
235static crm_signal_t *crm_signals[NSIG];
236
248static gboolean
249crm_signal_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
250{
251 crm_signal_t *sig = (crm_signal_t *) source;
252
253 if(sig->signal != SIGCHLD) {
254 crm_notice("Caught '%s' signal " QB_XS " %d (%s handler)",
255 strsignal(sig->signal), sig->signal,
256 (sig->handler? "invoking" : "no"));
257 }
258
259 sig->trigger.trigger = FALSE;
260 if (sig->handler) {
261 sig->handler(sig->signal);
262 }
263 return TRUE;
264}
265
275static void
276mainloop_signal_handler(int sig)
277{
278 if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
279 mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
280 }
281}
282
283// Functions implementing our custom glib source for signal handling
284static GSourceFuncs crm_signal_funcs = {
285 crm_trigger_prepare,
286 crm_trigger_check,
287 crm_signal_dispatch,
288 crm_trigger_finalize,
289};
290
305{
306 sigset_t mask;
307 struct sigaction sa;
308 struct sigaction old;
309
310 if (sigemptyset(&mask) < 0) {
311 crm_err("Could not set handler for signal %d: %s",
312 sig, pcmk_rc_str(errno));
313 return SIG_ERR;
314 }
315
316 memset(&sa, 0, sizeof(struct sigaction));
317 sa.sa_handler = dispatch;
318 sa.sa_flags = SA_RESTART;
319 sa.sa_mask = mask;
320
321 if (sigaction(sig, &sa, &old) < 0) {
322 crm_err("Could not set handler for signal %d: %s",
323 sig, pcmk_rc_str(errno));
324 return SIG_ERR;
325 }
326 return old.sa_handler;
327}
328
329static void
330mainloop_destroy_signal_entry(int sig)
331{
332 crm_signal_t *tmp = crm_signals[sig];
333
334 if (tmp != NULL) {
335 crm_signals[sig] = NULL;
336 crm_trace("Unregistering mainloop handler for signal %d", sig);
338 }
339}
340
352gboolean
353mainloop_add_signal(int sig, void (*dispatch) (int sig))
354{
355 GSource *source = NULL;
356 int priority = G_PRIORITY_HIGH - 1;
357
358 if (sig == SIGTERM) {
359 /* TERM is higher priority than other signals,
360 * signals are higher priority than other ipc.
361 * Yes, minus: smaller is "higher"
362 */
363 priority--;
364 }
365
366 if (sig >= NSIG || sig < 0) {
367 crm_err("Signal %d is out of range", sig);
368 return FALSE;
369
370 } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
371 crm_trace("Signal handler for %d is already installed", sig);
372 return TRUE;
373
374 } else if (crm_signals[sig] != NULL) {
375 crm_err("Different signal handler for %d is already installed", sig);
376 return FALSE;
377 }
378
379 pcmk__assert(sizeof(crm_signal_t) > sizeof(GSource));
380 source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
381
382 crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
383 pcmk__assert(crm_signals[sig] != NULL);
384
385 crm_signals[sig]->handler = dispatch;
386 crm_signals[sig]->signal = sig;
387
388 if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) {
389 mainloop_destroy_signal_entry(sig);
390 return FALSE;
391 }
392
393 return TRUE;
394}
395
396gboolean
398{
399 if (sig >= NSIG || sig < 0) {
400 crm_err("Signal %d is out of range", sig);
401 return FALSE;
402
403 } else if (crm_signal_handler(sig, NULL) == SIG_ERR) {
404 crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
405 return FALSE;
406
407 } else if (crm_signals[sig] == NULL) {
408 return TRUE;
409 }
410 mainloop_destroy_signal_entry(sig);
411 return TRUE;
412}
413
414static qb_array_t *gio_map = NULL;
415
416void
418{
419 if (gio_map != NULL) {
420 qb_array_free(gio_map);
421 gio_map = NULL;
422 }
423
424 for (int sig = 0; sig < NSIG; ++sig) {
425 mainloop_destroy_signal_entry(sig);
426 }
427}
428
429/*
430 * libqb...
431 */
432struct gio_to_qb_poll {
433 int32_t is_used;
434 guint source;
435 int32_t events;
436 void *data;
437 qb_ipcs_dispatch_fn_t fn;
438 enum qb_loop_priority p;
439};
440
441static gboolean
442gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
443{
444 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
445 gint fd = g_io_channel_unix_get_fd(gio);
446
447 crm_trace("%p.%d %d", data, fd, condition);
448
449 /* if this assert get's hit, then there is a race condition between
450 * when we destroy a fd and when mainloop actually gives it up */
451 pcmk__assert(adaptor->is_used > 0);
452
453 return (adaptor->fn(fd, condition, adaptor->data) == 0);
454}
455
456static void
457gio_poll_destroy(gpointer data)
458{
459 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
460
461 adaptor->is_used--;
462 pcmk__assert(adaptor->is_used >= 0);
463
464 if (adaptor->is_used == 0) {
465 crm_trace("Marking adaptor %p unused", adaptor);
466 adaptor->source = 0;
467 }
468}
469
478static gint
479conv_prio_libqb2glib(enum qb_loop_priority prio)
480{
481 switch (prio) {
482 case QB_LOOP_LOW: return G_PRIORITY_LOW;
483 case QB_LOOP_HIGH: return G_PRIORITY_HIGH;
484 default: return G_PRIORITY_DEFAULT; // QB_LOOP_MED
485 }
486}
487
497static enum qb_ipcs_rate_limit
498conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
499{
500 switch (prio) {
501 case QB_LOOP_LOW: return QB_IPCS_RATE_SLOW;
502 case QB_LOOP_HIGH: return QB_IPCS_RATE_FAST;
503 default: return QB_IPCS_RATE_NORMAL; // QB_LOOP_MED
504 }
505}
506
507static int32_t
508gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
509 void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
510{
511 struct gio_to_qb_poll *adaptor;
512 GIOChannel *channel;
513 int32_t res = 0;
514
515 res = qb_array_index(gio_map, fd, (void **)&adaptor);
516 if (res < 0) {
517 crm_err("Array lookup failed for fd=%d: %d", fd, res);
518 return res;
519 }
520
521 crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
522
523 if (add && adaptor->source) {
524 crm_err("Adaptor for descriptor %d is still in-use", fd);
525 return -EEXIST;
526 }
527 if (!add && !adaptor->is_used) {
528 crm_err("Adaptor for descriptor %d is not in-use", fd);
529 return -ENOENT;
530 }
531
532 /* channel is created with ref_count = 1 */
533 channel = g_io_channel_unix_new(fd);
534 if (!channel) {
535 crm_err("No memory left to add fd=%d", fd);
536 return -ENOMEM;
537 }
538
539 if (adaptor->source) {
540 g_source_remove(adaptor->source);
541 adaptor->source = 0;
542 }
543
544 /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
545 evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
546
547 adaptor->fn = fn;
548 adaptor->events = evts;
549 adaptor->data = data;
550 adaptor->p = p;
551 adaptor->is_used++;
552 adaptor->source =
553 g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
554 gio_read_socket, adaptor, gio_poll_destroy);
555
556 /* Now that mainloop now holds a reference to channel,
557 * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
558 *
559 * This means that channel will be free'd by:
560 * g_main_context_dispatch()
561 * -> g_source_destroy_internal()
562 * -> g_source_callback_unref()
563 * shortly after gio_poll_destroy() completes
564 */
565 g_io_channel_unref(channel);
566
567 crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
568 if (adaptor->source > 0) {
569 return 0;
570 }
571
572 return -EINVAL;
573}
574
575static int32_t
576gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
577 void *data, qb_ipcs_dispatch_fn_t fn)
578{
579 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
580}
581
582static int32_t
583gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
584 void *data, qb_ipcs_dispatch_fn_t fn)
585{
586 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
587}
588
589static int32_t
590gio_poll_dispatch_del(int32_t fd)
591{
592 struct gio_to_qb_poll *adaptor;
593
594 crm_trace("Looking for fd=%d", fd);
595 if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
596 if (adaptor->source) {
597 g_source_remove(adaptor->source);
598 adaptor->source = 0;
599 }
600 }
601 return 0;
602}
603
604struct qb_ipcs_poll_handlers gio_poll_funcs = {
605 .job_add = NULL,
606 .dispatch_add = gio_poll_dispatch_add,
607 .dispatch_mod = gio_poll_dispatch_mod,
608 .dispatch_del = gio_poll_dispatch_del,
609};
610
611static enum qb_ipc_type
612pick_ipc_type(enum qb_ipc_type requested)
613{
614 const char *env = pcmk__env_option(PCMK__ENV_IPC_TYPE);
615
616 if (env && strcmp("shared-mem", env) == 0) {
617 return QB_IPC_SHM;
618 } else if (env && strcmp("socket", env) == 0) {
619 return QB_IPC_SOCKET;
620 } else if (env && strcmp("posix", env) == 0) {
621 return QB_IPC_POSIX_MQ;
622 } else if (env && strcmp("sysv", env) == 0) {
623 return QB_IPC_SYSV_MQ;
624 } else if (requested == QB_IPC_NATIVE) {
625 /* We prefer shared memory because the server never blocks on
626 * send. If part of a message fits into the socket, libqb
627 * needs to block until the remainder can be sent also.
628 * Otherwise the client will wait forever for the remaining
629 * bytes.
630 */
631 return QB_IPC_SHM;
632 }
633 return requested;
634}
635
636qb_ipcs_service_t *
637mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
638 struct qb_ipcs_service_handlers *callbacks)
639{
640 return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
641}
642
643qb_ipcs_service_t *
644mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
645 struct qb_ipcs_service_handlers *callbacks,
646 enum qb_loop_priority prio)
647{
648 int rc = 0;
649 qb_ipcs_service_t *server = NULL;
650
651 if (gio_map == NULL) {
652 gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
653 }
654
655 server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
656
657 if (server == NULL) {
658 crm_err("Could not create %s IPC server: %s (%d)",
659 name, pcmk_rc_str(errno), errno);
660 return NULL;
661 }
662
663 if (prio != QB_LOOP_MED) {
664 qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
665 }
666
667 // Enforce a minimum IPC buffer size on all clients
668 qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
669 qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
670
671 rc = qb_ipcs_run(server);
672 if (rc < 0) {
673 crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
674 return NULL; // qb_ipcs_run() destroys server on failure
675 }
676
677 return server;
678}
679
680void
681mainloop_del_ipc_server(qb_ipcs_service_t * server)
682{
683 if (server) {
684 qb_ipcs_destroy(server);
685 }
686}
687
688struct mainloop_io_s {
689 char *name;
690 void *userdata;
691
692 int fd;
693 guint source;
694 crm_ipc_t *ipc;
695 GIOChannel *channel;
696
697 int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
698 int (*dispatch_fn_io) (gpointer userdata);
699 void (*destroy_fn) (gpointer userdata);
700
701};
702
713static gboolean
714mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer data)
715{
716 gboolean rc = G_SOURCE_CONTINUE;
717 mainloop_io_t *client = data;
718
719 pcmk__assert(client->fd == g_io_channel_unix_get_fd(gio));
720
721 if (condition & G_IO_IN) {
722 if (client->ipc) {
723 long read_rc = 0L;
724 int max = 10;
725
726 do {
727 read_rc = crm_ipc_read(client->ipc);
728 if (read_rc <= 0) {
729 crm_trace("Could not read IPC message from %s: %s (%ld)",
730 client->name, pcmk_strerror(read_rc), read_rc);
731
732 if (read_rc == -EAGAIN) {
733 continue;
734 }
735
736 } else if (client->dispatch_fn_ipc) {
737 const char *buffer = crm_ipc_buffer(client->ipc);
738
739 crm_trace("New %ld-byte IPC message from %s "
740 "after I/O condition %d",
741 read_rc, client->name, (int) condition);
742 if (client->dispatch_fn_ipc(buffer, read_rc, client->userdata) < 0) {
743 crm_trace("Connection to %s no longer required", client->name);
744 rc = G_SOURCE_REMOVE;
745 }
746 }
747
748 pcmk__ipc_free_client_buffer(client->ipc);
749
750 } while ((rc == G_SOURCE_CONTINUE) && (read_rc > 0) && --max > 0);
751
752 } else {
753 crm_trace("New I/O event for %s after I/O condition %d",
754 client->name, (int) condition);
755 if (client->dispatch_fn_io) {
756 if (client->dispatch_fn_io(client->userdata) < 0) {
757 crm_trace("Connection to %s no longer required", client->name);
758 rc = G_SOURCE_REMOVE;
759 }
760 }
761 }
762 }
763
764 if (client->ipc && !crm_ipc_connected(client->ipc)) {
765 crm_err("Connection to %s closed " QB_XS " client=%p condition=%d",
766 client->name, client, condition);
767 rc = G_SOURCE_REMOVE;
768
769 } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
770 crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
771 client->name, client, condition);
772 rc = G_SOURCE_REMOVE;
773
774 } else if ((condition & G_IO_IN) == 0) {
775 /*
776 #define GLIB_SYSDEF_POLLIN =1
777 #define GLIB_SYSDEF_POLLPRI =2
778 #define GLIB_SYSDEF_POLLOUT =4
779 #define GLIB_SYSDEF_POLLERR =8
780 #define GLIB_SYSDEF_POLLHUP =16
781 #define GLIB_SYSDEF_POLLNVAL =32
782
783 typedef enum
784 {
785 G_IO_IN GLIB_SYSDEF_POLLIN,
786 G_IO_OUT GLIB_SYSDEF_POLLOUT,
787 G_IO_PRI GLIB_SYSDEF_POLLPRI,
788 G_IO_ERR GLIB_SYSDEF_POLLERR,
789 G_IO_HUP GLIB_SYSDEF_POLLHUP,
790 G_IO_NVAL GLIB_SYSDEF_POLLNVAL
791 } GIOCondition;
792
793 A bitwise combination representing a condition to watch for on an event source.
794
795 G_IO_IN There is data to read.
796 G_IO_OUT Data can be written (without blocking).
797 G_IO_PRI There is urgent data to read.
798 G_IO_ERR Error condition.
799 G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
800 G_IO_NVAL Invalid request. The file descriptor is not open.
801 */
802 crm_err("Strange condition: %d", condition);
803 }
804
805 /* G_SOURCE_REMOVE results in mainloop_gio_destroy() being called
806 * just before the source is removed from mainloop
807 */
808 return rc;
809}
810
811static void
812mainloop_gio_destroy(gpointer c)
813{
814 mainloop_io_t *client = c;
815 char *c_name = strdup(client->name);
816
817 /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
818 * client->channel will still have ref_count > 0... should be == 1
819 */
820 crm_trace("Destroying client %s[%p]", c_name, c);
821
822 if (client->ipc) {
823 crm_ipc_close(client->ipc);
824 }
825
826 if (client->destroy_fn) {
827 void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
828
829 client->destroy_fn = NULL;
830 destroy_fn(client->userdata);
831 }
832
833 if (client->ipc) {
834 crm_ipc_t *ipc = client->ipc;
835
836 client->ipc = NULL;
837 crm_ipc_destroy(ipc);
838 }
839
840 crm_trace("Destroyed client %s[%p]", c_name, c);
841
842 free(client->name); client->name = NULL;
843 free(client);
844
845 free(c_name);
846}
847
866int
867pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata,
868 const struct ipc_client_callbacks *callbacks,
869 mainloop_io_t **source)
870{
871 int rc = pcmk_rc_ok;
872 int fd = -1;
873 const char *ipc_name = NULL;
874
875 CRM_CHECK((ipc != NULL) && (callbacks != NULL), return EINVAL);
876
877 ipc_name = pcmk__s(crm_ipc_name(ipc), "Pacemaker");
879 if (rc != pcmk_rc_ok) {
880 crm_debug("Connection to %s failed: %s", ipc_name, pcmk_rc_str(rc));
881 return rc;
882 }
883
884 rc = pcmk__ipc_fd(ipc, &fd);
885 if (rc != pcmk_rc_ok) {
886 crm_debug("Could not obtain file descriptor for %s IPC: %s",
887 ipc_name, pcmk_rc_str(rc));
888 crm_ipc_close(ipc);
889 return rc;
890 }
891
892 *source = mainloop_add_fd(ipc_name, priority, fd, userdata, NULL);
893 if (*source == NULL) {
894 rc = errno;
895 crm_ipc_close(ipc);
896 return rc;
897 }
898
899 (*source)->ipc = ipc;
900 (*source)->destroy_fn = callbacks->destroy;
901 (*source)->dispatch_fn_ipc = callbacks->dispatch;
902 return pcmk_rc_ok;
903}
904
912guint
914{
915 if (timer) {
916 return timer->period_ms;
917 }
918 return 0;
919}
920
922mainloop_add_ipc_client(const char *name, int priority, size_t max_size,
923 void *userdata, struct ipc_client_callbacks *callbacks)
924{
925 crm_ipc_t *ipc = crm_ipc_new(name, 0);
926 mainloop_io_t *source = NULL;
927 int rc = pcmk__add_mainloop_ipc(ipc, priority, userdata, callbacks,
928 &source);
929
930 if (rc != pcmk_rc_ok) {
931 if (crm_log_level == LOG_STDOUT) {
932 fprintf(stderr, "Connection to %s failed: %s",
933 name, pcmk_rc_str(rc));
934 }
935 crm_ipc_destroy(ipc);
936 if (rc > 0) {
937 errno = rc;
938 } else {
939 errno = ENOTCONN;
940 }
941 return NULL;
942 }
943 return source;
944}
945
946void
951
952crm_ipc_t *
954{
955 if (client) {
956 return client->ipc;
957 }
958 return NULL;
959}
960
962mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
963 struct mainloop_fd_callbacks * callbacks)
964{
965 mainloop_io_t *client = NULL;
966
967 if (fd >= 0) {
968 client = calloc(1, sizeof(mainloop_io_t));
969 if (client == NULL) {
970 return NULL;
971 }
972 client->name = strdup(name);
973 client->userdata = userdata;
974
975 if (callbacks) {
976 client->destroy_fn = callbacks->destroy;
977 client->dispatch_fn_io = callbacks->dispatch;
978 }
979
980 client->fd = fd;
981 client->channel = g_io_channel_unix_new(fd);
982 client->source =
983 g_io_add_watch_full(client->channel, priority,
984 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
985 client, mainloop_gio_destroy);
986
987 /* Now that mainloop now holds a reference to channel,
988 * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
989 *
990 * This means that channel will be free'd by:
991 * g_main_context_dispatch() or g_source_remove()
992 * -> g_source_destroy_internal()
993 * -> g_source_callback_unref()
994 * shortly after mainloop_gio_destroy() completes
995 */
996 g_io_channel_unref(client->channel);
997 crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
998 } else {
999 errno = EINVAL;
1000 }
1001
1002 return client;
1003}
1004
1005void
1007{
1008 if (client != NULL) {
1009 crm_trace("Removing client %s[%p]", client->name, client);
1010 if (client->source) {
1011 /* Results in mainloop_gio_destroy() being called just
1012 * before the source is removed from mainloop
1013 */
1014 g_source_remove(client->source);
1015 }
1016 }
1017}
1018
1019static GList *child_list = NULL;
1020
1021pid_t
1023{
1024 return child->pid;
1025}
1026
1027const char *
1029{
1030 return child->desc;
1031}
1032
1033int
1035{
1036 return child->timeout;
1037}
1038
1039void *
1041{
1042 return child->privatedata;
1043}
1044
1045void
1047{
1048 child->privatedata = NULL;
1049}
1050
1051/* good function name */
1052static void
1053child_free(mainloop_child_t *child)
1054{
1055 if (child->timerid != 0) {
1056 crm_trace("Removing timer %d", child->timerid);
1057 g_source_remove(child->timerid);
1058 child->timerid = 0;
1059 }
1060 free(child->desc);
1061 free(child);
1062}
1063
1064/* terrible function name */
1065static int
1066child_kill_helper(mainloop_child_t *child)
1067{
1068 int rc;
1069 if (child->flags & mainloop_leave_pid_group) {
1070 crm_debug("Kill pid %d only. leave group intact.", child->pid);
1071 rc = kill(child->pid, SIGKILL);
1072 } else {
1073 crm_debug("Kill pid %d's group", child->pid);
1074 rc = kill(-child->pid, SIGKILL);
1075 }
1076
1077 if (rc < 0) {
1078 if (errno != ESRCH) {
1079 crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
1080 }
1081 return -errno;
1082 }
1083 return 0;
1084}
1085
1086static gboolean
1087child_timeout_callback(gpointer p)
1088{
1089 mainloop_child_t *child = p;
1090 int rc = 0;
1091
1092 child->timerid = 0;
1093 if (child->timeout) {
1094 crm_warn("%s process (PID %d) will not die!", child->desc, (int)child->pid);
1095 return FALSE;
1096 }
1097
1098 rc = child_kill_helper(child);
1099 if (rc == -ESRCH) {
1100 /* Nothing left to do. pid doesn't exist */
1101 return FALSE;
1102 }
1103
1104 child->timeout = TRUE;
1105 crm_debug("%s process (PID %d) timed out", child->desc, (int)child->pid);
1106
1107 child->timerid = pcmk__create_timer(5000, child_timeout_callback, child);
1108 return FALSE;
1109}
1110
1111static bool
1112child_waitpid(mainloop_child_t *child, int flags)
1113{
1114 int rc = 0;
1115 int core = 0;
1116 int signo = 0;
1117 int status = 0;
1118 int exitcode = 0;
1119 bool callback_needed = true;
1120
1121 rc = waitpid(child->pid, &status, flags);
1122 if (rc == 0) { // WNOHANG in flags, and child status is not available
1123 crm_trace("Child process %d (%s) still active",
1124 child->pid, child->desc);
1125 callback_needed = false;
1126
1127 } else if (rc != child->pid) {
1128 /* According to POSIX, possible conditions:
1129 * - child->pid was non-positive (process group or any child),
1130 * and rc is specific child
1131 * - errno ECHILD (pid does not exist or is not child)
1132 * - errno EINVAL (invalid flags)
1133 * - errno EINTR (caller interrupted by signal)
1134 *
1135 * @TODO Handle these cases more specifically.
1136 */
1137 signo = SIGCHLD;
1138 exitcode = 1;
1139 crm_notice("Wait for child process %d (%s) interrupted: %s",
1140 child->pid, child->desc, pcmk_rc_str(errno));
1141
1142 } else if (WIFEXITED(status)) {
1143 exitcode = WEXITSTATUS(status);
1144 crm_trace("Child process %d (%s) exited with status %d",
1145 child->pid, child->desc, exitcode);
1146
1147 } else if (WIFSIGNALED(status)) {
1148 signo = WTERMSIG(status);
1149 crm_trace("Child process %d (%s) exited with signal %d (%s)",
1150 child->pid, child->desc, signo, strsignal(signo));
1151
1152#ifdef WCOREDUMP // AIX, SunOS, maybe others
1153 } else if (WCOREDUMP(status)) {
1154 core = 1;
1155 crm_err("Child process %d (%s) dumped core",
1156 child->pid, child->desc);
1157#endif
1158
1159 } else { // flags must contain WUNTRACED and/or WCONTINUED to reach this
1160 crm_trace("Child process %d (%s) stopped or continued",
1161 child->pid, child->desc);
1162 callback_needed = false;
1163 }
1164
1165 if (callback_needed && child->callback) {
1166 child->callback(child, child->pid, core, signo, exitcode);
1167 }
1168 return callback_needed;
1169}
1170
1171static void
1172child_death_dispatch(int signal)
1173{
1174 for (GList *iter = child_list; iter; ) {
1175 GList *saved = iter;
1176 mainloop_child_t *child = iter->data;
1177
1178 iter = iter->next;
1179 if (child_waitpid(child, WNOHANG)) {
1180 crm_trace("Removing completed process %d from child list",
1181 child->pid);
1182 child_list = g_list_remove_link(child_list, saved);
1183 g_list_free(saved);
1184 child_free(child);
1185 }
1186 }
1187}
1188
1189static gboolean
1190child_signal_init(gpointer p)
1191{
1192 crm_trace("Installed SIGCHLD handler");
1193 /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1194 mainloop_add_signal(SIGCHLD, child_death_dispatch);
1195
1196 /* In case they terminated before the signal handler was installed */
1197 child_death_dispatch(SIGCHLD);
1198 return FALSE;
1199}
1200
1201gboolean
1203{
1204 GList *iter;
1205 mainloop_child_t *child = NULL;
1206 mainloop_child_t *match = NULL;
1207 /* It is impossible to block SIGKILL, this allows us to
1208 * call waitpid without WNOHANG flag.*/
1209 int waitflags = 0, rc = 0;
1210
1211 for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1212 child = iter->data;
1213 if (pid == child->pid) {
1214 match = child;
1215 }
1216 }
1217
1218 if (match == NULL) {
1219 return FALSE;
1220 }
1221
1222 rc = child_kill_helper(match);
1223 if(rc == -ESRCH) {
1224 /* It's gone, but hasn't shown up in waitpid() yet. Wait until we get
1225 * SIGCHLD and let handler clean it up as normal (so we get the correct
1226 * return code/status). The blocking alternative would be to call
1227 * child_waitpid(match, 0).
1228 */
1229 crm_trace("Waiting for signal that child process %d completed",
1230 match->pid);
1231 return TRUE;
1232
1233 } else if(rc != 0) {
1234 /* If KILL for some other reason set the WNOHANG flag since we
1235 * can't be certain what happened.
1236 */
1237 waitflags = WNOHANG;
1238 }
1239
1240 if (!child_waitpid(match, waitflags)) {
1241 /* not much we can do if this occurs */
1242 return FALSE;
1243 }
1244
1245 child_list = g_list_remove(child_list, match);
1246 child_free(match);
1247 return TRUE;
1248}
1249
1250/* Create/Log a new tracked process
1251 * To track a process group, use -pid
1252 *
1253 * @TODO Using a non-positive pid (i.e. any child, or process group) would
1254 * likely not be useful since we will free the child after the first
1255 * completed process.
1256 */
1257void
1258mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1259 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1260{
1261 static bool need_init = TRUE;
1263
1264 child->pid = pid;
1265 child->timerid = 0;
1266 child->timeout = FALSE;
1267 child->privatedata = privatedata;
1268 child->callback = callback;
1269 child->flags = flags;
1270 child->desc = pcmk__str_copy(desc);
1271
1272 if (timeout) {
1273 child->timerid = pcmk__create_timer(timeout, child_timeout_callback, child);
1274 }
1275
1276 child_list = g_list_append(child_list, child);
1277
1278 if(need_init) {
1279 need_init = FALSE;
1280 /* SIGCHLD processing has to be invoked from mainloop.
1281 * We do not want it to be possible to both add a child pid
1282 * to mainloop, and have the pid's exit callback invoked within
1283 * the same callstack. */
1284 pcmk__create_timer(1, child_signal_init, NULL);
1285 }
1286}
1287
1288void
1289mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1290 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1291{
1292 mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1293}
1294
1295static gboolean
1296mainloop_timer_cb(gpointer user_data)
1297{
1298 int id = 0;
1299 bool repeat = FALSE;
1300 struct mainloop_timer_s *t = user_data;
1301
1302 pcmk__assert(t != NULL);
1303
1304 id = t->id;
1305 t->id = 0; /* Ensure it's unset during callbacks so that
1306 * mainloop_timer_running() works as expected
1307 */
1308
1309 if(t->cb) {
1310 crm_trace("Invoking callbacks for timer %s", t->name);
1311 repeat = t->repeat;
1312 if(t->cb(t->userdata) == FALSE) {
1313 crm_trace("Timer %s complete", t->name);
1314 repeat = FALSE;
1315 }
1316 }
1317
1318 if(repeat) {
1319 /* Restore if repeating */
1320 t->id = id;
1321 }
1322
1323 return repeat;
1324}
1325
1326bool
1328{
1329 if(t && t->id != 0) {
1330 return TRUE;
1331 }
1332 return FALSE;
1333}
1334
1335void
1337{
1339 if(t && t->period_ms > 0) {
1340 crm_trace("Starting timer %s", t->name);
1341 t->id = pcmk__create_timer(t->period_ms, mainloop_timer_cb, t);
1342 }
1343}
1344
1345void
1347{
1348 if(t && t->id != 0) {
1349 crm_trace("Stopping timer %s", t->name);
1350 g_source_remove(t->id);
1351 t->id = 0;
1352 }
1353}
1354
1355guint
1357{
1358 guint last = 0;
1359
1360 if(t) {
1361 last = t->period_ms;
1362 t->period_ms = period_ms;
1363 }
1364
1365 if(t && t->id != 0 && last != t->period_ms) {
1367 }
1368 return last;
1369}
1370
1372mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1373{
1375
1376 if (name != NULL) {
1377 t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1378 } else {
1379 t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1380 }
1381 t->id = 0;
1382 t->period_ms = period_ms;
1383 t->repeat = repeat;
1384 t->cb = cb;
1385 t->userdata = userdata;
1386 crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1387 return t;
1388}
1389
1390void
1392{
1393 if(t) {
1394 crm_trace("Destroying timer %s", t->name);
1396 free(t->name);
1397 free(t);
1398 }
1399}
1400
1401/*
1402 * Helpers to make sure certain events aren't lost at shutdown
1403 */
1404
1405static gboolean
1406drain_timeout_cb(gpointer user_data)
1407{
1408 bool *timeout_popped = (bool*) user_data;
1409
1410 *timeout_popped = TRUE;
1411 return FALSE;
1412}
1413
1420void
1421pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
1422{
1423 if ((mloop != NULL) && g_main_loop_is_running(mloop)) {
1424 GMainContext *ctx = g_main_loop_get_context(mloop);
1425
1426 /* Drain up to n events in case some memory clean-up is pending
1427 * (helpful to reduce noise in valgrind output).
1428 */
1429 for (int i = 0; (i < n) && g_main_context_pending(ctx); ++i) {
1430 g_main_context_dispatch(ctx);
1431 }
1432 g_main_loop_quit(mloop);
1433 }
1434}
1435
1449void
1450pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
1451{
1452 bool timeout_popped = FALSE;
1453 guint timer = 0;
1454 GMainContext *ctx = NULL;
1455
1456 CRM_CHECK(mloop && check, return);
1457
1458 ctx = g_main_loop_get_context(mloop);
1459 if (ctx) {
1460 time_t start_time = time(NULL);
1461
1462 timer = pcmk__create_timer(timer_ms, drain_timeout_cb, &timeout_popped);
1463 while (!timeout_popped
1464 && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1465 g_main_context_iteration(ctx, TRUE);
1466 }
1467 }
1468 if (!timeout_popped && (timer > 0)) {
1469 g_source_remove(timer);
1470 }
1471}
const char * name
Definition cib.c:26
guint pcmk__create_timer(guint interval_ms, GSourceFunc fn, gpointer data)
Definition utils.c:405
#define pcmk__assert_alloc(nmemb, size)
Definition internal.h:246
uint64_t flags
Definition remote.c:3
char data[0]
Definition cpg.c:10
uint32_t id
Definition cpg.c:0
uint32_t pid
Definition cpg.c:1
enum pcmk_ipc_server type
Definition cpg.c:3
A dumping ground.
void crm_ipc_destroy(crm_ipc_t *client)
Definition ipc_client.c:976
const char * crm_ipc_name(crm_ipc_t *client)
long crm_ipc_read(crm_ipc_t *client)
unsigned int crm_ipc_default_buffer_size(void)
Return pacemaker's IPC buffer size.
Definition ipc_common.c:31
bool crm_ipc_connected(crm_ipc_t *client)
void crm_ipc_close(crm_ipc_t *client)
Definition ipc_client.c:963
const char * crm_ipc_buffer(crm_ipc_t *client)
struct crm_ipc_s crm_ipc_t
Definition ipc.h:159
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
Create a new (legacy) object for using Pacemaker daemon IPC.
Definition ipc_client.c:877
int pcmk__connect_generic_ipc(crm_ipc_t *ipc)
Definition ipc_client.c:912
int pcmk__ipc_fd(crm_ipc_t *ipc, int *fd)
void pcmk__ipc_free_client_buffer(crm_ipc_t *client)
#define crm_warn(fmt, args...)
Definition logging.h:360
#define LOG_STDOUT
Definition logging.h:43
#define crm_notice(fmt, args...)
Definition logging.h:363
#define crm_perror(level, fmt, args...)
Send a system error message to both the log and stderr.
Definition logging.h:299
#define CRM_CHECK(expr, failure_action)
Definition logging.h:213
#define crm_debug(fmt, args...)
Definition logging.h:368
#define crm_err(fmt, args...)
Definition logging.h:357
unsigned int crm_log_level
Definition logging.c:45
#define crm_trace(fmt, args...)
Definition logging.h:370
void pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
Drain some remaining main loop events then quit it.
Definition mainloop.c:1421
qb_ipcs_service_t * mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks, enum qb_loop_priority prio)
Start server-side API end-point, hooked into the internal event loop.
Definition mainloop.c:644
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
Definition mainloop.c:353
guint pcmk__mainloop_timer_get_period(const mainloop_timer_t *timer)
Get period for mainloop timer.
Definition mainloop.c:913
int mainloop_child_timeout(mainloop_child_t *child)
Definition mainloop.c:1034
pid_t mainloop_child_pid(mainloop_child_t *child)
Definition mainloop.c:1022
void mainloop_set_trigger(crm_trigger_t *source)
Definition mainloop.c:195
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
Definition mainloop.c:203
int pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata, const struct ipc_client_callbacks *callbacks, mainloop_io_t **source)
Connect to IPC and add it as a main loop source.
Definition mainloop.c:867
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
Definition mainloop.c:1356
void * mainloop_child_userdata(mainloop_child_t *child)
Definition mainloop.c:1040
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
Definition mainloop.c:953
bool mainloop_timer_running(mainloop_timer_t *t)
Definition mainloop.c:1327
struct qb_ipcs_poll_handlers gio_poll_funcs
Definition mainloop.c:604
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Definition mainloop.c:637
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
Definition mainloop.c:1372
sighandler_t crm_signal_handler(int sig, sighandler_t dispatch)
Definition mainloop.c:304
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
Definition mainloop.c:922
void mainloop_trigger_complete(crm_trigger_t *trig)
Definition mainloop.c:164
void mainloop_timer_del(mainloop_timer_t *t)
Definition mainloop.c:1391
void mainloop_timer_start(mainloop_timer_t *t)
Definition mainloop.c:1336
void mainloop_del_fd(mainloop_io_t *client)
Definition mainloop.c:1006
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
Definition mainloop.c:681
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
Create a trigger to be used as a mainloop source.
Definition mainloop.c:183
void mainloop_cleanup(void)
Definition mainloop.c:417
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition mainloop.c:1289
void mainloop_timer_stop(mainloop_timer_t *t)
Definition mainloop.c:1346
gboolean mainloop_child_kill(pid_t pid)
Definition mainloop.c:1202
void mainloop_del_ipc_client(mainloop_io_t *client)
Definition mainloop.c:947
void mainloop_clear_child_userdata(mainloop_child_t *child)
Definition mainloop.c:1046
struct signal_s crm_signal_t
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition mainloop.c:962
void pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool(*check)(guint))
Process main loop events while a certain condition is met.
Definition mainloop.c:1450
const char * mainloop_child_name(mainloop_child_t *child)
Definition mainloop.c:1028
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition mainloop.c:1258
gboolean mainloop_destroy_signal(int sig)
Definition mainloop.c:397
Wrappers for and extensions to glib mainloop.
struct mainloop_child_s mainloop_child_t
Definition mainloop.h:42
struct mainloop_timer_s mainloop_timer_t
Definition mainloop.h:45
void(* sighandler_t)(int)
Definition mainloop.h:61
struct mainloop_io_s mainloop_io_t
Definition mainloop.h:41
mainloop_child_flags
Definition mainloop.h:33
@ mainloop_leave_pid_group
Definition mainloop.h:35
struct trigger_s crm_trigger_t
Definition mainloop.h:39
#define PCMK__ENV_IPC_TYPE
const char * pcmk__env_option(const char *option)
Definition options.c:1085
unsigned int timeout
Definition pcmk_fence.c:34
const char * pcmk_strerror(int rc)
Definition results.c:257
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition results.c:617
@ pcmk_rc_ok
Definition results.h:159
#define pcmk__assert(expr)
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define pcmk__str_copy(str)
void(* destroy)(gpointer userdata)
Destroy function for mainloop IPC connection client data.
Definition mainloop.h:103
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
Dispatch function for an IPC connection used as mainloop source.
Definition mainloop.h:96
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
Definition mainloop.h:151
void(* destroy)(gpointer userdata)
Destroy function for mainloop file descriptor client data.
Definition mainloop.h:158
Wrappers for and extensions to libxml2.