This source file includes following definitions.
- crm_trigger_prepare
- crm_trigger_check
- crm_trigger_dispatch
- crm_trigger_finalize
- mainloop_setup_trigger
- mainloop_trigger_complete
- mainloop_add_trigger
- mainloop_set_trigger
- mainloop_destroy_trigger
- crm_signal_dispatch
- mainloop_signal_handler
- crm_signal_handler
- mainloop_destroy_signal_entry
- mainloop_add_signal
- mainloop_destroy_signal
- mainloop_cleanup
- gio_read_socket
- gio_poll_destroy
- conv_prio_libqb2glib
- conv_libqb_prio2ratelimit
- gio_poll_dispatch_update
- gio_poll_dispatch_add
- gio_poll_dispatch_mod
- gio_poll_dispatch_del
- pick_ipc_type
- mainloop_add_ipc_server
- mainloop_add_ipc_server_with_prio
- mainloop_del_ipc_server
- mainloop_gio_callback
- mainloop_gio_destroy
- pcmk__add_mainloop_ipc
- pcmk__mainloop_timer_get_period
- mainloop_add_ipc_client
- mainloop_del_ipc_client
- mainloop_get_ipc_client
- mainloop_add_fd
- mainloop_del_fd
- mainloop_child_pid
- mainloop_child_name
- mainloop_child_timeout
- mainloop_child_userdata
- mainloop_clear_child_userdata
- child_free
- child_kill_helper
- child_timeout_callback
- child_waitpid
- child_death_dispatch
- child_signal_init
- mainloop_child_kill
- mainloop_child_add_with_flags
- mainloop_child_add
- mainloop_timer_cb
- mainloop_timer_running
- mainloop_timer_start
- mainloop_timer_stop
- mainloop_timer_set_period
- mainloop_timer_add
- mainloop_timer_del
- drain_timeout_cb
- pcmk_quit_main_loop
- pcmk_drain_main_loop
- crm_signal
1
2
3
4
5
6
7
8
9
10 #include <crm_internal.h>
11
12 #ifndef _GNU_SOURCE
13 # define _GNU_SOURCE
14 #endif
15
16 #include <stdlib.h>
17 #include <string.h>
18 #include <signal.h>
19 #include <errno.h>
20
21 #include <sys/wait.h>
22
23 #include <crm/crm.h>
24 #include <crm/common/xml.h>
25 #include <crm/common/mainloop.h>
26 #include <crm/common/ipc_internal.h>
27
28 #include <qb/qbarray.h>
29
30 struct mainloop_child_s {
31 pid_t pid;
32 char *desc;
33 unsigned timerid;
34 gboolean timeout;
35 void *privatedata;
36
37 enum mainloop_child_flags flags;
38
39
40 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
41 };
42
43 struct trigger_s {
44 GSource source;
45 gboolean running;
46 gboolean trigger;
47 void *user_data;
48 guint id;
49
50 };
51
52 struct mainloop_timer_s {
53 guint id;
54 guint period_ms;
55 bool repeat;
56 char *name;
57 GSourceFunc cb;
58 void *userdata;
59 };
60
61 static gboolean
62 crm_trigger_prepare(GSource * source, gint * timeout)
63 {
64 crm_trigger_t *trig = (crm_trigger_t *) source;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 *timeout = 500;
84
85 return trig->trigger;
86 }
87
88 static gboolean
89 crm_trigger_check(GSource * source)
90 {
91 crm_trigger_t *trig = (crm_trigger_t *) source;
92
93 return trig->trigger;
94 }
95
96
97
98
99
100
101
102
103
104
105
106 static gboolean
107 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
108 {
109 gboolean rc = G_SOURCE_CONTINUE;
110 crm_trigger_t *trig = (crm_trigger_t *) source;
111
112 if (trig->running) {
113
114 return G_SOURCE_CONTINUE;
115 }
116 trig->trigger = FALSE;
117
118 if (callback) {
119 int callback_rc = callback(trig->user_data);
120
121 if (callback_rc < 0) {
122 crm_trace("Trigger handler %p not yet complete", trig);
123 trig->running = TRUE;
124 } else if (callback_rc == 0) {
125 rc = G_SOURCE_REMOVE;
126 }
127 }
128 return rc;
129 }
130
131 static void
132 crm_trigger_finalize(GSource * source)
133 {
134 crm_trace("Trigger %p destroyed", source);
135 }
136
137 static GSourceFuncs crm_trigger_funcs = {
138 crm_trigger_prepare,
139 crm_trigger_check,
140 crm_trigger_dispatch,
141 crm_trigger_finalize,
142 };
143
144 static crm_trigger_t *
145 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
146 gpointer userdata)
147 {
148 crm_trigger_t *trigger = NULL;
149
150 trigger = (crm_trigger_t *) source;
151
152 trigger->id = 0;
153 trigger->trigger = FALSE;
154 trigger->user_data = userdata;
155
156 if (dispatch) {
157 g_source_set_callback(source, dispatch, trigger, NULL);
158 }
159
160 g_source_set_priority(source, priority);
161 g_source_set_can_recurse(source, FALSE);
162
163 trigger->id = g_source_attach(source, NULL);
164 return trigger;
165 }
166
167 void
168 mainloop_trigger_complete(crm_trigger_t * trig)
169 {
170 crm_trace("Trigger handler %p complete", trig);
171 trig->running = FALSE;
172 }
173
174
175
176
177
178
179
180
181
182
183
184
185 crm_trigger_t *
186 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
187 {
188 GSource *source = NULL;
189
190 CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
191 source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
192 CRM_ASSERT(source != NULL);
193
194 return mainloop_setup_trigger(source, priority, dispatch, userdata);
195 }
196
197 void
198 mainloop_set_trigger(crm_trigger_t * source)
199 {
200 if(source) {
201 source->trigger = TRUE;
202 }
203 }
204
205 gboolean
206 mainloop_destroy_trigger(crm_trigger_t * source)
207 {
208 GSource *gs = NULL;
209
210 if(source == NULL) {
211 return TRUE;
212 }
213
214 gs = (GSource *)source;
215
216 g_source_destroy(gs);
217 g_source_unref(gs);
218
219
220
221
222
223
224
225 return TRUE;
226 }
227
228
229
230
231 typedef struct signal_s {
232 crm_trigger_t trigger;
233 void (*handler) (int sig);
234 int signal;
235 } crm_signal_t;
236
237
238 static crm_signal_t *crm_signals[NSIG];
239
240
241
242
243
244
245
246
247
248
249
250
251 static gboolean
252 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
253 {
254 crm_signal_t *sig = (crm_signal_t *) source;
255
256 if(sig->signal != SIGCHLD) {
257 crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
258 strsignal(sig->signal), sig->signal,
259 (sig->handler? "invoking" : "no"));
260 }
261
262 sig->trigger.trigger = FALSE;
263 if (sig->handler) {
264 sig->handler(sig->signal);
265 }
266 return TRUE;
267 }
268
269
270
271
272
273
274
275
276
277
278 static void
279 mainloop_signal_handler(int sig)
280 {
281 if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
282 mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
283 }
284 }
285
286
287 static GSourceFuncs crm_signal_funcs = {
288 crm_trigger_prepare,
289 crm_trigger_check,
290 crm_signal_dispatch,
291 crm_trigger_finalize,
292 };
293
294
295
296
297
298
299
300
301
302
303
304
305
306 sighandler_t
307 crm_signal_handler(int sig, sighandler_t dispatch)
308 {
309 sigset_t mask;
310 struct sigaction sa;
311 struct sigaction old;
312
313 if (sigemptyset(&mask) < 0) {
314 crm_err("Could not set handler for signal %d: %s",
315 sig, pcmk_rc_str(errno));
316 return SIG_ERR;
317 }
318
319 memset(&sa, 0, sizeof(struct sigaction));
320 sa.sa_handler = dispatch;
321 sa.sa_flags = SA_RESTART;
322 sa.sa_mask = mask;
323
324 if (sigaction(sig, &sa, &old) < 0) {
325 crm_err("Could not set handler for signal %d: %s",
326 sig, pcmk_rc_str(errno));
327 return SIG_ERR;
328 }
329 return old.sa_handler;
330 }
331
332 static void
333 mainloop_destroy_signal_entry(int sig)
334 {
335 crm_signal_t *tmp = crm_signals[sig];
336
337 crm_signals[sig] = NULL;
338
339 crm_trace("Destroying signal %d", sig);
340 mainloop_destroy_trigger((crm_trigger_t *) tmp);
341 }
342
343
344
345
346
347
348
349
350
351
352
353
354 gboolean
355 mainloop_add_signal(int sig, void (*dispatch) (int sig))
356 {
357 GSource *source = NULL;
358 int priority = G_PRIORITY_HIGH - 1;
359
360 if (sig == SIGTERM) {
361
362
363
364
365 priority--;
366 }
367
368 if (sig >= NSIG || sig < 0) {
369 crm_err("Signal %d is out of range", sig);
370 return FALSE;
371
372 } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
373 crm_trace("Signal handler for %d is already installed", sig);
374 return TRUE;
375
376 } else if (crm_signals[sig] != NULL) {
377 crm_err("Different signal handler for %d is already installed", sig);
378 return FALSE;
379 }
380
381 CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
382 source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
383
384 crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
385 CRM_ASSERT(crm_signals[sig] != NULL);
386
387 crm_signals[sig]->handler = dispatch;
388 crm_signals[sig]->signal = sig;
389
390 if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) {
391 mainloop_destroy_signal_entry(sig);
392 return FALSE;
393 }
394 #if 0
395
396
397
398
399
400 if (siginterrupt(sig, 1) < 0) {
401 crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
402 }
403 #endif
404
405 return TRUE;
406 }
407
408 gboolean
409 mainloop_destroy_signal(int sig)
410 {
411 if (sig >= NSIG || sig < 0) {
412 crm_err("Signal %d is out of range", sig);
413 return FALSE;
414
415 } else if (crm_signal_handler(sig, NULL) == SIG_ERR) {
416 crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
417 return FALSE;
418
419 } else if (crm_signals[sig] == NULL) {
420 return TRUE;
421 }
422 mainloop_destroy_signal_entry(sig);
423 return TRUE;
424 }
425
426 static qb_array_t *gio_map = NULL;
427
428 void
429 mainloop_cleanup(void)
430 {
431 if (gio_map) {
432 qb_array_free(gio_map);
433 }
434
435 for (int sig = 0; sig < NSIG; ++sig) {
436 mainloop_destroy_signal_entry(sig);
437 }
438 }
439
440
441
442
443 struct gio_to_qb_poll {
444 int32_t is_used;
445 guint source;
446 int32_t events;
447 void *data;
448 qb_ipcs_dispatch_fn_t fn;
449 enum qb_loop_priority p;
450 };
451
452 static gboolean
453 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
454 {
455 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
456 gint fd = g_io_channel_unix_get_fd(gio);
457
458 crm_trace("%p.%d %d", data, fd, condition);
459
460
461
462 CRM_ASSERT(adaptor->is_used > 0);
463
464 return (adaptor->fn(fd, condition, adaptor->data) == 0);
465 }
466
467 static void
468 gio_poll_destroy(gpointer data)
469 {
470 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
471
472 adaptor->is_used--;
473 CRM_ASSERT(adaptor->is_used >= 0);
474
475 if (adaptor->is_used == 0) {
476 crm_trace("Marking adaptor %p unused", adaptor);
477 adaptor->source = 0;
478 }
479 }
480
481
482
483
484
485
486
487
488
489 static gint
490 conv_prio_libqb2glib(enum qb_loop_priority prio)
491 {
492 gint ret = G_PRIORITY_DEFAULT;
493 switch (prio) {
494 case QB_LOOP_LOW:
495 ret = G_PRIORITY_LOW;
496 break;
497 case QB_LOOP_HIGH:
498 ret = G_PRIORITY_HIGH;
499 break;
500 default:
501 crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
502 prio);
503
504 case QB_LOOP_MED:
505 break;
506 }
507 return ret;
508 }
509
510
511
512
513
514
515
516
517
518 static enum qb_ipcs_rate_limit
519 conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
520 {
521
522 enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
523 switch (prio) {
524 case QB_LOOP_LOW:
525 ret = QB_IPCS_RATE_SLOW;
526 break;
527 case QB_LOOP_HIGH:
528 ret = QB_IPCS_RATE_FAST;
529 break;
530 default:
531 crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
532 prio);
533
534 case QB_LOOP_MED:
535 break;
536 }
537 return ret;
538 }
539
540 static int32_t
541 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
542 void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
543 {
544 struct gio_to_qb_poll *adaptor;
545 GIOChannel *channel;
546 int32_t res = 0;
547
548 res = qb_array_index(gio_map, fd, (void **)&adaptor);
549 if (res < 0) {
550 crm_err("Array lookup failed for fd=%d: %d", fd, res);
551 return res;
552 }
553
554 crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
555
556 if (add && adaptor->source) {
557 crm_err("Adaptor for descriptor %d is still in-use", fd);
558 return -EEXIST;
559 }
560 if (!add && !adaptor->is_used) {
561 crm_err("Adaptor for descriptor %d is not in-use", fd);
562 return -ENOENT;
563 }
564
565
566 channel = g_io_channel_unix_new(fd);
567 if (!channel) {
568 crm_err("No memory left to add fd=%d", fd);
569 return -ENOMEM;
570 }
571
572 if (adaptor->source) {
573 g_source_remove(adaptor->source);
574 adaptor->source = 0;
575 }
576
577
578 evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
579
580 adaptor->fn = fn;
581 adaptor->events = evts;
582 adaptor->data = data;
583 adaptor->p = p;
584 adaptor->is_used++;
585 adaptor->source =
586 g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
587 gio_read_socket, adaptor, gio_poll_destroy);
588
589
590
591
592
593
594
595
596
597
598 g_io_channel_unref(channel);
599
600 crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
601 if (adaptor->source > 0) {
602 return 0;
603 }
604
605 return -EINVAL;
606 }
607
608 static int32_t
609 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
610 void *data, qb_ipcs_dispatch_fn_t fn)
611 {
612 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
613 }
614
615 static int32_t
616 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
617 void *data, qb_ipcs_dispatch_fn_t fn)
618 {
619 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
620 }
621
622 static int32_t
623 gio_poll_dispatch_del(int32_t fd)
624 {
625 struct gio_to_qb_poll *adaptor;
626
627 crm_trace("Looking for fd=%d", fd);
628 if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
629 if (adaptor->source) {
630 g_source_remove(adaptor->source);
631 adaptor->source = 0;
632 }
633 }
634 return 0;
635 }
636
637 struct qb_ipcs_poll_handlers gio_poll_funcs = {
638 .job_add = NULL,
639 .dispatch_add = gio_poll_dispatch_add,
640 .dispatch_mod = gio_poll_dispatch_mod,
641 .dispatch_del = gio_poll_dispatch_del,
642 };
643
644 static enum qb_ipc_type
645 pick_ipc_type(enum qb_ipc_type requested)
646 {
647 const char *env = getenv("PCMK_ipc_type");
648
649 if (env && strcmp("shared-mem", env) == 0) {
650 return QB_IPC_SHM;
651 } else if (env && strcmp("socket", env) == 0) {
652 return QB_IPC_SOCKET;
653 } else if (env && strcmp("posix", env) == 0) {
654 return QB_IPC_POSIX_MQ;
655 } else if (env && strcmp("sysv", env) == 0) {
656 return QB_IPC_SYSV_MQ;
657 } else if (requested == QB_IPC_NATIVE) {
658
659
660
661
662
663
664 return QB_IPC_SHM;
665 }
666 return requested;
667 }
668
669 qb_ipcs_service_t *
670 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
671 struct qb_ipcs_service_handlers *callbacks)
672 {
673 return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
674 }
675
676 qb_ipcs_service_t *
677 mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
678 struct qb_ipcs_service_handlers *callbacks,
679 enum qb_loop_priority prio)
680 {
681 int rc = 0;
682 qb_ipcs_service_t *server = NULL;
683
684 if (gio_map == NULL) {
685 gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
686 }
687
688 server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
689
690 if (server == NULL) {
691 crm_err("Could not create %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
692 return NULL;
693 }
694
695 if (prio != QB_LOOP_MED) {
696 qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
697 }
698
699
700 qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
701 qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
702
703 rc = qb_ipcs_run(server);
704 if (rc < 0) {
705 crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
706 qb_ipcs_destroy(server);
707 return NULL;
708 }
709
710 return server;
711 }
712
713 void
714 mainloop_del_ipc_server(qb_ipcs_service_t * server)
715 {
716 if (server) {
717 qb_ipcs_destroy(server);
718 }
719 }
720
721 struct mainloop_io_s {
722 char *name;
723 void *userdata;
724
725 int fd;
726 guint source;
727 crm_ipc_t *ipc;
728 GIOChannel *channel;
729
730 int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
731 int (*dispatch_fn_io) (gpointer userdata);
732 void (*destroy_fn) (gpointer userdata);
733
734 };
735
736
737
738
739
740
741
742
743
744
745
746 static gboolean
747 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
748 {
749 gboolean rc = G_SOURCE_CONTINUE;
750 mainloop_io_t *client = data;
751
752 CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
753
754 if (condition & G_IO_IN) {
755 if (client->ipc) {
756 long read_rc = 0L;
757 int max = 10;
758
759 do {
760 read_rc = crm_ipc_read(client->ipc);
761 if (read_rc <= 0) {
762 crm_trace("Could not read IPC message from %s: %s (%ld)",
763 client->name, pcmk_strerror(read_rc), read_rc);
764
765 } else if (client->dispatch_fn_ipc) {
766 const char *buffer = crm_ipc_buffer(client->ipc);
767
768 crm_trace("New %ld-byte IPC message from %s "
769 "after I/O condition %d",
770 read_rc, client->name, (int) condition);
771 if (client->dispatch_fn_ipc(buffer, read_rc, client->userdata) < 0) {
772 crm_trace("Connection to %s no longer required", client->name);
773 rc = G_SOURCE_REMOVE;
774 }
775 }
776
777 } while ((rc == G_SOURCE_CONTINUE) && (read_rc > 0) && --max > 0);
778
779 } else {
780 crm_trace("New I/O event for %s after I/O condition %d",
781 client->name, (int) condition);
782 if (client->dispatch_fn_io) {
783 if (client->dispatch_fn_io(client->userdata) < 0) {
784 crm_trace("Connection to %s no longer required", client->name);
785 rc = G_SOURCE_REMOVE;
786 }
787 }
788 }
789 }
790
791 if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
792 crm_err("Connection to %s closed " CRM_XS "client=%p condition=%d",
793 client->name, client, condition);
794 rc = G_SOURCE_REMOVE;
795
796 } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
797 crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
798 client->name, client, condition);
799 rc = G_SOURCE_REMOVE;
800
801 } else if ((condition & G_IO_IN) == 0) {
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829 crm_err("Strange condition: %d", condition);
830 }
831
832
833
834
835 return rc;
836 }
837
838 static void
839 mainloop_gio_destroy(gpointer c)
840 {
841 mainloop_io_t *client = c;
842 char *c_name = strdup(client->name);
843
844
845
846
847 crm_trace("Destroying client %s[%p]", c_name, c);
848
849 if (client->ipc) {
850 crm_ipc_close(client->ipc);
851 }
852
853 if (client->destroy_fn) {
854 void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
855
856 client->destroy_fn = NULL;
857 destroy_fn(client->userdata);
858 }
859
860 if (client->ipc) {
861 crm_ipc_t *ipc = client->ipc;
862
863 client->ipc = NULL;
864 crm_ipc_destroy(ipc);
865 }
866
867 crm_trace("Destroyed client %s[%p]", c_name, c);
868
869 free(client->name); client->name = NULL;
870 free(client);
871
872 free(c_name);
873 }
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893 int
894 pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata,
895 struct ipc_client_callbacks *callbacks,
896 mainloop_io_t **source)
897 {
898 CRM_CHECK((ipc != NULL) && (callbacks != NULL), return EINVAL);
899
900 if (!crm_ipc_connect(ipc)) {
901 int rc = errno;
902 crm_debug("Connection to %s failed: %d", crm_ipc_name(ipc), errno);
903 return rc;
904 }
905 *source = mainloop_add_fd(crm_ipc_name(ipc), priority, crm_ipc_get_fd(ipc),
906 userdata, NULL);
907 if (*source == NULL) {
908 int rc = errno;
909
910 crm_ipc_close(ipc);
911 return rc;
912 }
913 (*source)->ipc = ipc;
914 (*source)->destroy_fn = callbacks->destroy;
915 (*source)->dispatch_fn_ipc = callbacks->dispatch;
916 return pcmk_rc_ok;
917 }
918
919
920
921
922
923
924
925
926 guint
927 pcmk__mainloop_timer_get_period(mainloop_timer_t *timer)
928 {
929 if (timer) {
930 return timer->period_ms;
931 }
932 return 0;
933 }
934
935 mainloop_io_t *
936 mainloop_add_ipc_client(const char *name, int priority, size_t max_size,
937 void *userdata, struct ipc_client_callbacks *callbacks)
938 {
939 crm_ipc_t *ipc = crm_ipc_new(name, max_size);
940 mainloop_io_t *source = NULL;
941 int rc = pcmk__add_mainloop_ipc(ipc, priority, userdata, callbacks,
942 &source);
943
944 if (rc != pcmk_rc_ok) {
945 if (crm_log_level == LOG_STDOUT) {
946 fprintf(stderr, "Connection to %s failed: %s",
947 name, pcmk_rc_str(rc));
948 }
949 crm_ipc_destroy(ipc);
950 if (rc > 0) {
951 errno = rc;
952 } else {
953 errno = ENOTCONN;
954 }
955 return NULL;
956 }
957 return source;
958 }
959
960 void
961 mainloop_del_ipc_client(mainloop_io_t * client)
962 {
963 mainloop_del_fd(client);
964 }
965
966 crm_ipc_t *
967 mainloop_get_ipc_client(mainloop_io_t * client)
968 {
969 if (client) {
970 return client->ipc;
971 }
972 return NULL;
973 }
974
975 mainloop_io_t *
976 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
977 struct mainloop_fd_callbacks * callbacks)
978 {
979 mainloop_io_t *client = NULL;
980
981 if (fd >= 0) {
982 client = calloc(1, sizeof(mainloop_io_t));
983 if (client == NULL) {
984 return NULL;
985 }
986 client->name = strdup(name);
987 client->userdata = userdata;
988
989 if (callbacks) {
990 client->destroy_fn = callbacks->destroy;
991 client->dispatch_fn_io = callbacks->dispatch;
992 }
993
994 client->fd = fd;
995 client->channel = g_io_channel_unix_new(fd);
996 client->source =
997 g_io_add_watch_full(client->channel, priority,
998 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
999 client, mainloop_gio_destroy);
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010 g_io_channel_unref(client->channel);
1011 crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
1012 } else {
1013 errno = EINVAL;
1014 }
1015
1016 return client;
1017 }
1018
1019 void
1020 mainloop_del_fd(mainloop_io_t * client)
1021 {
1022 if (client != NULL) {
1023 crm_trace("Removing client %s[%p]", client->name, client);
1024 if (client->source) {
1025
1026
1027
1028 g_source_remove(client->source);
1029 }
1030 }
1031 }
1032
1033 static GList *child_list = NULL;
1034
1035 pid_t
1036 mainloop_child_pid(mainloop_child_t * child)
1037 {
1038 return child->pid;
1039 }
1040
1041 const char *
1042 mainloop_child_name(mainloop_child_t * child)
1043 {
1044 return child->desc;
1045 }
1046
1047 int
1048 mainloop_child_timeout(mainloop_child_t * child)
1049 {
1050 return child->timeout;
1051 }
1052
1053 void *
1054 mainloop_child_userdata(mainloop_child_t * child)
1055 {
1056 return child->privatedata;
1057 }
1058
1059 void
1060 mainloop_clear_child_userdata(mainloop_child_t * child)
1061 {
1062 child->privatedata = NULL;
1063 }
1064
1065
1066 static void
1067 child_free(mainloop_child_t *child)
1068 {
1069 if (child->timerid != 0) {
1070 crm_trace("Removing timer %d", child->timerid);
1071 g_source_remove(child->timerid);
1072 child->timerid = 0;
1073 }
1074 free(child->desc);
1075 free(child);
1076 }
1077
1078
1079 static int
1080 child_kill_helper(mainloop_child_t *child)
1081 {
1082 int rc;
1083 if (child->flags & mainloop_leave_pid_group) {
1084 crm_debug("Kill pid %d only. leave group intact.", child->pid);
1085 rc = kill(child->pid, SIGKILL);
1086 } else {
1087 crm_debug("Kill pid %d's group", child->pid);
1088 rc = kill(-child->pid, SIGKILL);
1089 }
1090
1091 if (rc < 0) {
1092 if (errno != ESRCH) {
1093 crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
1094 }
1095 return -errno;
1096 }
1097 return 0;
1098 }
1099
1100 static gboolean
1101 child_timeout_callback(gpointer p)
1102 {
1103 mainloop_child_t *child = p;
1104 int rc = 0;
1105
1106 child->timerid = 0;
1107 if (child->timeout) {
1108 crm_warn("%s process (PID %d) will not die!", child->desc, (int)child->pid);
1109 return FALSE;
1110 }
1111
1112 rc = child_kill_helper(child);
1113 if (rc == -ESRCH) {
1114
1115 return FALSE;
1116 }
1117
1118 child->timeout = TRUE;
1119 crm_debug("%s process (PID %d) timed out", child->desc, (int)child->pid);
1120
1121 child->timerid = g_timeout_add(5000, child_timeout_callback, child);
1122 return FALSE;
1123 }
1124
1125 static bool
1126 child_waitpid(mainloop_child_t *child, int flags)
1127 {
1128 int rc = 0;
1129 int core = 0;
1130 int signo = 0;
1131 int status = 0;
1132 int exitcode = 0;
1133 bool callback_needed = true;
1134
1135 rc = waitpid(child->pid, &status, flags);
1136 if (rc == 0) {
1137 crm_trace("Child process %d (%s) still active",
1138 child->pid, child->desc);
1139 callback_needed = false;
1140
1141 } else if (rc != child->pid) {
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151 signo = SIGCHLD;
1152 exitcode = 1;
1153 crm_notice("Wait for child process %d (%s) interrupted: %s",
1154 child->pid, child->desc, pcmk_rc_str(errno));
1155
1156 } else if (WIFEXITED(status)) {
1157 exitcode = WEXITSTATUS(status);
1158 crm_trace("Child process %d (%s) exited with status %d",
1159 child->pid, child->desc, exitcode);
1160
1161 } else if (WIFSIGNALED(status)) {
1162 signo = WTERMSIG(status);
1163 crm_trace("Child process %d (%s) exited with signal %d (%s)",
1164 child->pid, child->desc, signo, strsignal(signo));
1165
1166 #ifdef WCOREDUMP
1167 } else if (WCOREDUMP(status)) {
1168 core = 1;
1169 crm_err("Child process %d (%s) dumped core",
1170 child->pid, child->desc);
1171 #endif
1172
1173 } else {
1174 crm_trace("Child process %d (%s) stopped or continued",
1175 child->pid, child->desc);
1176 callback_needed = false;
1177 }
1178
1179 if (callback_needed && child->callback) {
1180 child->callback(child, child->pid, core, signo, exitcode);
1181 }
1182 return callback_needed;
1183 }
1184
1185 static void
1186 child_death_dispatch(int signal)
1187 {
1188 for (GList *iter = child_list; iter; ) {
1189 GList *saved = iter;
1190 mainloop_child_t *child = iter->data;
1191
1192 iter = iter->next;
1193 if (child_waitpid(child, WNOHANG)) {
1194 crm_trace("Removing completed process %d from child list",
1195 child->pid);
1196 child_list = g_list_remove_link(child_list, saved);
1197 g_list_free(saved);
1198 child_free(child);
1199 }
1200 }
1201 }
1202
1203 static gboolean
1204 child_signal_init(gpointer p)
1205 {
1206 crm_trace("Installed SIGCHLD handler");
1207
1208 mainloop_add_signal(SIGCHLD, child_death_dispatch);
1209
1210
1211 child_death_dispatch(SIGCHLD);
1212 return FALSE;
1213 }
1214
1215 gboolean
1216 mainloop_child_kill(pid_t pid)
1217 {
1218 GList *iter;
1219 mainloop_child_t *child = NULL;
1220 mainloop_child_t *match = NULL;
1221
1222
1223 int waitflags = 0, rc = 0;
1224
1225 for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1226 child = iter->data;
1227 if (pid == child->pid) {
1228 match = child;
1229 }
1230 }
1231
1232 if (match == NULL) {
1233 return FALSE;
1234 }
1235
1236 rc = child_kill_helper(match);
1237 if(rc == -ESRCH) {
1238
1239
1240
1241
1242
1243 crm_trace("Waiting for signal that child process %d completed",
1244 match->pid);
1245 return TRUE;
1246
1247 } else if(rc != 0) {
1248
1249
1250
1251 waitflags = WNOHANG;
1252 }
1253
1254 if (!child_waitpid(match, waitflags)) {
1255
1256 return FALSE;
1257 }
1258
1259 child_list = g_list_remove(child_list, match);
1260 child_free(match);
1261 return TRUE;
1262 }
1263
1264
1265
1266
1267
1268
1269
1270
1271 void
1272 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1273 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1274 {
1275 static bool need_init = TRUE;
1276 mainloop_child_t *child = calloc(1, sizeof(mainloop_child_t));
1277
1278 child->pid = pid;
1279 child->timerid = 0;
1280 child->timeout = FALSE;
1281 child->privatedata = privatedata;
1282 child->callback = callback;
1283 child->flags = flags;
1284 pcmk__str_update(&child->desc, desc);
1285
1286 if (timeout) {
1287 child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1288 }
1289
1290 child_list = g_list_append(child_list, child);
1291
1292 if(need_init) {
1293 need_init = FALSE;
1294
1295
1296
1297
1298 g_timeout_add(1, child_signal_init, NULL);
1299 }
1300 }
1301
1302 void
1303 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1304 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1305 {
1306 mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1307 }
1308
1309 static gboolean
1310 mainloop_timer_cb(gpointer user_data)
1311 {
1312 int id = 0;
1313 bool repeat = FALSE;
1314 struct mainloop_timer_s *t = user_data;
1315
1316 CRM_ASSERT(t != NULL);
1317
1318 id = t->id;
1319 t->id = 0;
1320
1321
1322
1323 if(t->cb) {
1324 crm_trace("Invoking callbacks for timer %s", t->name);
1325 repeat = t->repeat;
1326 if(t->cb(t->userdata) == FALSE) {
1327 crm_trace("Timer %s complete", t->name);
1328 repeat = FALSE;
1329 }
1330 }
1331
1332 if(repeat) {
1333
1334 t->id = id;
1335 }
1336
1337 return repeat;
1338 }
1339
1340 bool
1341 mainloop_timer_running(mainloop_timer_t *t)
1342 {
1343 if(t && t->id != 0) {
1344 return TRUE;
1345 }
1346 return FALSE;
1347 }
1348
1349 void
1350 mainloop_timer_start(mainloop_timer_t *t)
1351 {
1352 mainloop_timer_stop(t);
1353 if(t && t->period_ms > 0) {
1354 crm_trace("Starting timer %s", t->name);
1355 t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1356 }
1357 }
1358
1359 void
1360 mainloop_timer_stop(mainloop_timer_t *t)
1361 {
1362 if(t && t->id != 0) {
1363 crm_trace("Stopping timer %s", t->name);
1364 g_source_remove(t->id);
1365 t->id = 0;
1366 }
1367 }
1368
1369 guint
1370 mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
1371 {
1372 guint last = 0;
1373
1374 if(t) {
1375 last = t->period_ms;
1376 t->period_ms = period_ms;
1377 }
1378
1379 if(t && t->id != 0 && last != t->period_ms) {
1380 mainloop_timer_start(t);
1381 }
1382 return last;
1383 }
1384
1385 mainloop_timer_t *
1386 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1387 {
1388 mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
1389
1390 if(t) {
1391 if(name) {
1392 t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1393 } else {
1394 t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1395 }
1396 t->id = 0;
1397 t->period_ms = period_ms;
1398 t->repeat = repeat;
1399 t->cb = cb;
1400 t->userdata = userdata;
1401 crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1402 }
1403 return t;
1404 }
1405
1406 void
1407 mainloop_timer_del(mainloop_timer_t *t)
1408 {
1409 if(t) {
1410 crm_trace("Destroying timer %s", t->name);
1411 mainloop_timer_stop(t);
1412 free(t->name);
1413 free(t);
1414 }
1415 }
1416
1417
1418
1419
1420
1421 static gboolean
1422 drain_timeout_cb(gpointer user_data)
1423 {
1424 bool *timeout_popped = (bool*) user_data;
1425
1426 *timeout_popped = TRUE;
1427 return FALSE;
1428 }
1429
1430
1431
1432
1433
1434
1435
1436 void
1437 pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
1438 {
1439 if ((mloop != NULL) && g_main_loop_is_running(mloop)) {
1440 GMainContext *ctx = g_main_loop_get_context(mloop);
1441
1442
1443
1444
1445 for (int i = 0; (i < n) && g_main_context_pending(ctx); ++i) {
1446 g_main_context_dispatch(ctx);
1447 }
1448 g_main_loop_quit(mloop);
1449 }
1450 }
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464 void
1465 pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
1466 {
1467 bool timeout_popped = FALSE;
1468 guint timer = 0;
1469 GMainContext *ctx = NULL;
1470
1471 CRM_CHECK(mloop && check, return);
1472
1473 ctx = g_main_loop_get_context(mloop);
1474 if (ctx) {
1475 time_t start_time = time(NULL);
1476
1477 timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped);
1478 while (!timeout_popped
1479 && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1480 g_main_context_iteration(ctx, TRUE);
1481 }
1482 }
1483 if (!timeout_popped && (timer > 0)) {
1484 g_source_remove(timer);
1485 }
1486 }
1487
1488
1489
1490
1491 #include <crm/common/mainloop_compat.h>
1492
1493 gboolean
1494 crm_signal(int sig, void (*dispatch) (int sig))
1495 {
1496 return crm_signal_handler(sig, dispatch) != SIG_ERR;
1497 }
1498
1499
1500