This source file includes following definitions.
- crm_trigger_prepare
- crm_trigger_check
- crm_trigger_dispatch
- crm_trigger_finalize
- mainloop_setup_trigger
- mainloop_trigger_complete
- mainloop_add_trigger
- mainloop_set_trigger
- mainloop_destroy_trigger
- crm_signal_dispatch
- mainloop_signal_handler
- crm_signal_handler
- mainloop_destroy_signal_entry
- mainloop_add_signal
- mainloop_destroy_signal
- mainloop_cleanup
- gio_read_socket
- gio_poll_destroy
- conv_prio_libqb2glib
- conv_libqb_prio2ratelimit
- gio_poll_dispatch_update
- gio_poll_dispatch_add
- gio_poll_dispatch_mod
- gio_poll_dispatch_del
- pick_ipc_type
- mainloop_add_ipc_server
- mainloop_add_ipc_server_with_prio
- mainloop_del_ipc_server
- mainloop_gio_callback
- mainloop_gio_destroy
- pcmk__add_mainloop_ipc
- pcmk__mainloop_timer_get_period
- mainloop_add_ipc_client
- mainloop_del_ipc_client
- mainloop_get_ipc_client
- mainloop_add_fd
- mainloop_del_fd
- mainloop_child_pid
- mainloop_child_name
- mainloop_child_timeout
- mainloop_child_userdata
- mainloop_clear_child_userdata
- child_free
- child_kill_helper
- child_timeout_callback
- child_waitpid
- child_death_dispatch
- child_signal_init
- mainloop_child_kill
- mainloop_child_add_with_flags
- mainloop_child_add
- mainloop_timer_cb
- mainloop_timer_running
- mainloop_timer_start
- mainloop_timer_stop
- mainloop_timer_set_period
- mainloop_timer_add
- mainloop_timer_del
- drain_timeout_cb
- pcmk_quit_main_loop
- pcmk_drain_main_loop
1
2
3
4
5
6
7
8
9
10 #include <crm_internal.h>
11
12 #include <stdlib.h>
13 #include <string.h>
14 #include <signal.h>
15 #include <errno.h>
16
17 #include <sys/wait.h>
18
19 #include <crm/crm.h>
20 #include <crm/common/xml.h>
21 #include <crm/common/mainloop.h>
22 #include <crm/common/ipc_internal.h>
23
24 #include <qb/qbarray.h>
25
26 struct mainloop_child_s {
27 pid_t pid;
28 char *desc;
29 unsigned timerid;
30 gboolean timeout;
31 void *privatedata;
32
33 enum mainloop_child_flags flags;
34
35
36 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
37 };
38
39 struct trigger_s {
40 GSource source;
41 gboolean running;
42 gboolean trigger;
43 void *user_data;
44 guint id;
45
46 };
47
48 struct mainloop_timer_s {
49 guint id;
50 guint period_ms;
51 bool repeat;
52 char *name;
53 GSourceFunc cb;
54 void *userdata;
55 };
56
57 static gboolean
58 crm_trigger_prepare(GSource * source, gint * timeout)
59 {
60 crm_trigger_t *trig = (crm_trigger_t *) source;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 *timeout = 500;
80
81 return trig->trigger;
82 }
83
84 static gboolean
85 crm_trigger_check(GSource * source)
86 {
87 crm_trigger_t *trig = (crm_trigger_t *) source;
88
89 return trig->trigger;
90 }
91
92
93
94
95
96
97
98
99
100
101
102 static gboolean
103 crm_trigger_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
104 {
105 gboolean rc = G_SOURCE_CONTINUE;
106 crm_trigger_t *trig = (crm_trigger_t *) source;
107
108 if (trig->running) {
109
110 return G_SOURCE_CONTINUE;
111 }
112 trig->trigger = FALSE;
113
114 if (callback) {
115 int callback_rc = callback(trig->user_data);
116
117 if (callback_rc < 0) {
118 crm_trace("Trigger handler %p not yet complete", trig);
119 trig->running = TRUE;
120 } else if (callback_rc == 0) {
121 rc = G_SOURCE_REMOVE;
122 }
123 }
124 return rc;
125 }
126
127 static void
128 crm_trigger_finalize(GSource * source)
129 {
130 crm_trace("Trigger %p destroyed", source);
131 }
132
133 static GSourceFuncs crm_trigger_funcs = {
134 crm_trigger_prepare,
135 crm_trigger_check,
136 crm_trigger_dispatch,
137 crm_trigger_finalize,
138 };
139
140 static crm_trigger_t *
141 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
142 gpointer userdata)
143 {
144 crm_trigger_t *trigger = NULL;
145
146 trigger = (crm_trigger_t *) source;
147
148 trigger->id = 0;
149 trigger->trigger = FALSE;
150 trigger->user_data = userdata;
151
152 if (dispatch) {
153 g_source_set_callback(source, dispatch, trigger, NULL);
154 }
155
156 g_source_set_priority(source, priority);
157 g_source_set_can_recurse(source, FALSE);
158
159 trigger->id = g_source_attach(source, NULL);
160 return trigger;
161 }
162
163 void
164 mainloop_trigger_complete(crm_trigger_t * trig)
165 {
166 crm_trace("Trigger handler %p complete", trig);
167 trig->running = FALSE;
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181
182 crm_trigger_t *
183 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data),
184 gpointer userdata)
185 {
186 GSource *source = NULL;
187
188 pcmk__assert(sizeof(crm_trigger_t) > sizeof(GSource));
189 source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
190
191 return mainloop_setup_trigger(source, priority, dispatch, userdata);
192 }
193
194 void
195 mainloop_set_trigger(crm_trigger_t * source)
196 {
197 if(source) {
198 source->trigger = TRUE;
199 }
200 }
201
202 gboolean
203 mainloop_destroy_trigger(crm_trigger_t * source)
204 {
205 GSource *gs = NULL;
206
207 if(source == NULL) {
208 return TRUE;
209 }
210
211 gs = (GSource *)source;
212
213 g_source_destroy(gs);
214 g_source_unref(gs);
215
216
217
218
219
220
221
222 return TRUE;
223 }
224
225
226
227
228 typedef struct signal_s {
229 crm_trigger_t trigger;
230 void (*handler) (int sig);
231 int signal;
232 } crm_signal_t;
233
234
235 static crm_signal_t *crm_signals[NSIG];
236
237
238
239
240
241
242
243
244
245
246
247
248 static gboolean
249 crm_signal_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
250 {
251 crm_signal_t *sig = (crm_signal_t *) source;
252
253 if(sig->signal != SIGCHLD) {
254 crm_notice("Caught '%s' signal " QB_XS " %d (%s handler)",
255 strsignal(sig->signal), sig->signal,
256 (sig->handler? "invoking" : "no"));
257 }
258
259 sig->trigger.trigger = FALSE;
260 if (sig->handler) {
261 sig->handler(sig->signal);
262 }
263 return TRUE;
264 }
265
266
267
268
269
270
271
272
273
274
275 static void
276 mainloop_signal_handler(int sig)
277 {
278 if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
279 mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
280 }
281 }
282
283
284 static GSourceFuncs crm_signal_funcs = {
285 crm_trigger_prepare,
286 crm_trigger_check,
287 crm_signal_dispatch,
288 crm_trigger_finalize,
289 };
290
291
292
293
294
295
296
297
298
299
300
301
302
303 sighandler_t
304 crm_signal_handler(int sig, sighandler_t dispatch)
305 {
306 sigset_t mask;
307 struct sigaction sa;
308 struct sigaction old;
309
310 if (sigemptyset(&mask) < 0) {
311 crm_err("Could not set handler for signal %d: %s",
312 sig, pcmk_rc_str(errno));
313 return SIG_ERR;
314 }
315
316 memset(&sa, 0, sizeof(struct sigaction));
317 sa.sa_handler = dispatch;
318 sa.sa_flags = SA_RESTART;
319 sa.sa_mask = mask;
320
321 if (sigaction(sig, &sa, &old) < 0) {
322 crm_err("Could not set handler for signal %d: %s",
323 sig, pcmk_rc_str(errno));
324 return SIG_ERR;
325 }
326 return old.sa_handler;
327 }
328
329 static void
330 mainloop_destroy_signal_entry(int sig)
331 {
332 crm_signal_t *tmp = crm_signals[sig];
333
334 if (tmp != NULL) {
335 crm_signals[sig] = NULL;
336 crm_trace("Unregistering mainloop handler for signal %d", sig);
337 mainloop_destroy_trigger((crm_trigger_t *) tmp);
338 }
339 }
340
341
342
343
344
345
346
347
348
349
350
351
352 gboolean
353 mainloop_add_signal(int sig, void (*dispatch) (int sig))
354 {
355 GSource *source = NULL;
356 int priority = G_PRIORITY_HIGH - 1;
357
358 if (sig == SIGTERM) {
359
360
361
362
363 priority--;
364 }
365
366 if (sig >= NSIG || sig < 0) {
367 crm_err("Signal %d is out of range", sig);
368 return FALSE;
369
370 } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
371 crm_trace("Signal handler for %d is already installed", sig);
372 return TRUE;
373
374 } else if (crm_signals[sig] != NULL) {
375 crm_err("Different signal handler for %d is already installed", sig);
376 return FALSE;
377 }
378
379 pcmk__assert(sizeof(crm_signal_t) > sizeof(GSource));
380 source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
381
382 crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
383 pcmk__assert(crm_signals[sig] != NULL);
384
385 crm_signals[sig]->handler = dispatch;
386 crm_signals[sig]->signal = sig;
387
388 if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) {
389 mainloop_destroy_signal_entry(sig);
390 return FALSE;
391 }
392
393 return TRUE;
394 }
395
396 gboolean
397 mainloop_destroy_signal(int sig)
398 {
399 if (sig >= NSIG || sig < 0) {
400 crm_err("Signal %d is out of range", sig);
401 return FALSE;
402
403 } else if (crm_signal_handler(sig, NULL) == SIG_ERR) {
404 crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
405 return FALSE;
406
407 } else if (crm_signals[sig] == NULL) {
408 return TRUE;
409 }
410 mainloop_destroy_signal_entry(sig);
411 return TRUE;
412 }
413
414 static qb_array_t *gio_map = NULL;
415
416 void
417 mainloop_cleanup(void)
418 {
419 if (gio_map != NULL) {
420 qb_array_free(gio_map);
421 gio_map = NULL;
422 }
423
424 for (int sig = 0; sig < NSIG; ++sig) {
425 mainloop_destroy_signal_entry(sig);
426 }
427 }
428
429
430
431
432 struct gio_to_qb_poll {
433 int32_t is_used;
434 guint source;
435 int32_t events;
436 void *data;
437 qb_ipcs_dispatch_fn_t fn;
438 enum qb_loop_priority p;
439 };
440
441 static gboolean
442 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
443 {
444 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
445 gint fd = g_io_channel_unix_get_fd(gio);
446
447 crm_trace("%p.%d %d", data, fd, condition);
448
449
450
451 pcmk__assert(adaptor->is_used > 0);
452
453 return (adaptor->fn(fd, condition, adaptor->data) == 0);
454 }
455
456 static void
457 gio_poll_destroy(gpointer data)
458 {
459 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
460
461 adaptor->is_used--;
462 pcmk__assert(adaptor->is_used >= 0);
463
464 if (adaptor->is_used == 0) {
465 crm_trace("Marking adaptor %p unused", adaptor);
466 adaptor->source = 0;
467 }
468 }
469
470
471
472
473
474
475
476
477
478 static gint
479 conv_prio_libqb2glib(enum qb_loop_priority prio)
480 {
481 switch (prio) {
482 case QB_LOOP_LOW: return G_PRIORITY_LOW;
483 case QB_LOOP_HIGH: return G_PRIORITY_HIGH;
484 default: return G_PRIORITY_DEFAULT;
485 }
486 }
487
488
489
490
491
492
493
494
495
496
497 static enum qb_ipcs_rate_limit
498 conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
499 {
500 switch (prio) {
501 case QB_LOOP_LOW: return QB_IPCS_RATE_SLOW;
502 case QB_LOOP_HIGH: return QB_IPCS_RATE_FAST;
503 default: return QB_IPCS_RATE_NORMAL;
504 }
505 }
506
507 static int32_t
508 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
509 void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
510 {
511 struct gio_to_qb_poll *adaptor;
512 GIOChannel *channel;
513 int32_t res = 0;
514
515 res = qb_array_index(gio_map, fd, (void **)&adaptor);
516 if (res < 0) {
517 crm_err("Array lookup failed for fd=%d: %d", fd, res);
518 return res;
519 }
520
521 crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
522
523 if (add && adaptor->source) {
524 crm_err("Adaptor for descriptor %d is still in-use", fd);
525 return -EEXIST;
526 }
527 if (!add && !adaptor->is_used) {
528 crm_err("Adaptor for descriptor %d is not in-use", fd);
529 return -ENOENT;
530 }
531
532
533 channel = g_io_channel_unix_new(fd);
534 if (!channel) {
535 crm_err("No memory left to add fd=%d", fd);
536 return -ENOMEM;
537 }
538
539 if (adaptor->source) {
540 g_source_remove(adaptor->source);
541 adaptor->source = 0;
542 }
543
544
545 evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
546
547 adaptor->fn = fn;
548 adaptor->events = evts;
549 adaptor->data = data;
550 adaptor->p = p;
551 adaptor->is_used++;
552 adaptor->source =
553 g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
554 gio_read_socket, adaptor, gio_poll_destroy);
555
556
557
558
559
560
561
562
563
564
565 g_io_channel_unref(channel);
566
567 crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
568 if (adaptor->source > 0) {
569 return 0;
570 }
571
572 return -EINVAL;
573 }
574
575 static int32_t
576 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
577 void *data, qb_ipcs_dispatch_fn_t fn)
578 {
579 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
580 }
581
582 static int32_t
583 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
584 void *data, qb_ipcs_dispatch_fn_t fn)
585 {
586 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
587 }
588
589 static int32_t
590 gio_poll_dispatch_del(int32_t fd)
591 {
592 struct gio_to_qb_poll *adaptor;
593
594 crm_trace("Looking for fd=%d", fd);
595 if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
596 if (adaptor->source) {
597 g_source_remove(adaptor->source);
598 adaptor->source = 0;
599 }
600 }
601 return 0;
602 }
603
604 struct qb_ipcs_poll_handlers gio_poll_funcs = {
605 .job_add = NULL,
606 .dispatch_add = gio_poll_dispatch_add,
607 .dispatch_mod = gio_poll_dispatch_mod,
608 .dispatch_del = gio_poll_dispatch_del,
609 };
610
611 static enum qb_ipc_type
612 pick_ipc_type(enum qb_ipc_type requested)
613 {
614 const char *env = pcmk__env_option(PCMK__ENV_IPC_TYPE);
615
616 if (env && strcmp("shared-mem", env) == 0) {
617 return QB_IPC_SHM;
618 } else if (env && strcmp("socket", env) == 0) {
619 return QB_IPC_SOCKET;
620 } else if (env && strcmp("posix", env) == 0) {
621 return QB_IPC_POSIX_MQ;
622 } else if (env && strcmp("sysv", env) == 0) {
623 return QB_IPC_SYSV_MQ;
624 } else if (requested == QB_IPC_NATIVE) {
625
626
627
628
629
630
631 return QB_IPC_SHM;
632 }
633 return requested;
634 }
635
636 qb_ipcs_service_t *
637 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
638 struct qb_ipcs_service_handlers *callbacks)
639 {
640 return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
641 }
642
643 qb_ipcs_service_t *
644 mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
645 struct qb_ipcs_service_handlers *callbacks,
646 enum qb_loop_priority prio)
647 {
648 int rc = 0;
649 qb_ipcs_service_t *server = NULL;
650
651 if (gio_map == NULL) {
652 gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
653 }
654
655 server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
656
657 if (server == NULL) {
658 crm_err("Could not create %s IPC server: %s (%d)",
659 name, pcmk_rc_str(errno), errno);
660 return NULL;
661 }
662
663 if (prio != QB_LOOP_MED) {
664 qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
665 }
666
667
668 qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
669 qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
670
671 rc = qb_ipcs_run(server);
672 if (rc < 0) {
673 crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
674 return NULL;
675 }
676
677 return server;
678 }
679
680 void
681 mainloop_del_ipc_server(qb_ipcs_service_t * server)
682 {
683 if (server) {
684 qb_ipcs_destroy(server);
685 }
686 }
687
688 struct mainloop_io_s {
689 char *name;
690 void *userdata;
691
692 int fd;
693 guint source;
694 crm_ipc_t *ipc;
695 GIOChannel *channel;
696
697 int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
698 int (*dispatch_fn_io) (gpointer userdata);
699 void (*destroy_fn) (gpointer userdata);
700
701 };
702
703
704
705
706
707
708
709
710
711
712
713 static gboolean
714 mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer data)
715 {
716 gboolean rc = G_SOURCE_CONTINUE;
717 mainloop_io_t *client = data;
718
719 pcmk__assert(client->fd == g_io_channel_unix_get_fd(gio));
720
721 if (condition & G_IO_IN) {
722 if (client->ipc) {
723 long read_rc = 0L;
724 int max = 10;
725
726 do {
727 read_rc = crm_ipc_read(client->ipc);
728 if (read_rc <= 0) {
729 crm_trace("Could not read IPC message from %s: %s (%ld)",
730 client->name, pcmk_strerror(read_rc), read_rc);
731
732 if (read_rc == -EAGAIN) {
733 continue;
734 }
735
736 } else if (client->dispatch_fn_ipc) {
737 const char *buffer = crm_ipc_buffer(client->ipc);
738
739 crm_trace("New %ld-byte IPC message from %s "
740 "after I/O condition %d",
741 read_rc, client->name, (int) condition);
742 if (client->dispatch_fn_ipc(buffer, read_rc, client->userdata) < 0) {
743 crm_trace("Connection to %s no longer required", client->name);
744 rc = G_SOURCE_REMOVE;
745 }
746 }
747
748 pcmk__ipc_free_client_buffer(client->ipc);
749
750 } while ((rc == G_SOURCE_CONTINUE) && (read_rc > 0) && --max > 0);
751
752 } else {
753 crm_trace("New I/O event for %s after I/O condition %d",
754 client->name, (int) condition);
755 if (client->dispatch_fn_io) {
756 if (client->dispatch_fn_io(client->userdata) < 0) {
757 crm_trace("Connection to %s no longer required", client->name);
758 rc = G_SOURCE_REMOVE;
759 }
760 }
761 }
762 }
763
764 if (client->ipc && !crm_ipc_connected(client->ipc)) {
765 crm_err("Connection to %s closed " QB_XS " client=%p condition=%d",
766 client->name, client, condition);
767 rc = G_SOURCE_REMOVE;
768
769 } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
770 crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
771 client->name, client, condition);
772 rc = G_SOURCE_REMOVE;
773
774 } else if ((condition & G_IO_IN) == 0) {
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802 crm_err("Strange condition: %d", condition);
803 }
804
805
806
807
808 return rc;
809 }
810
811 static void
812 mainloop_gio_destroy(gpointer c)
813 {
814 mainloop_io_t *client = c;
815 char *c_name = strdup(client->name);
816
817
818
819
820 crm_trace("Destroying client %s[%p]", c_name, c);
821
822 if (client->ipc) {
823 crm_ipc_close(client->ipc);
824 }
825
826 if (client->destroy_fn) {
827 void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
828
829 client->destroy_fn = NULL;
830 destroy_fn(client->userdata);
831 }
832
833 if (client->ipc) {
834 crm_ipc_t *ipc = client->ipc;
835
836 client->ipc = NULL;
837 crm_ipc_destroy(ipc);
838 }
839
840 crm_trace("Destroyed client %s[%p]", c_name, c);
841
842 free(client->name); client->name = NULL;
843 free(client);
844
845 free(c_name);
846 }
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866 int
867 pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata,
868 const struct ipc_client_callbacks *callbacks,
869 mainloop_io_t **source)
870 {
871 int rc = pcmk_rc_ok;
872 int fd = -1;
873 const char *ipc_name = NULL;
874
875 CRM_CHECK((ipc != NULL) && (callbacks != NULL), return EINVAL);
876
877 ipc_name = pcmk__s(crm_ipc_name(ipc), "Pacemaker");
878 rc = pcmk__connect_generic_ipc(ipc);
879 if (rc != pcmk_rc_ok) {
880 crm_debug("Connection to %s failed: %s", ipc_name, pcmk_rc_str(rc));
881 return rc;
882 }
883
884 rc = pcmk__ipc_fd(ipc, &fd);
885 if (rc != pcmk_rc_ok) {
886 crm_debug("Could not obtain file descriptor for %s IPC: %s",
887 ipc_name, pcmk_rc_str(rc));
888 crm_ipc_close(ipc);
889 return rc;
890 }
891
892 *source = mainloop_add_fd(ipc_name, priority, fd, userdata, NULL);
893 if (*source == NULL) {
894 rc = errno;
895 crm_ipc_close(ipc);
896 return rc;
897 }
898
899 (*source)->ipc = ipc;
900 (*source)->destroy_fn = callbacks->destroy;
901 (*source)->dispatch_fn_ipc = callbacks->dispatch;
902 return pcmk_rc_ok;
903 }
904
905
906
907
908
909
910
911
912 guint
913 pcmk__mainloop_timer_get_period(const mainloop_timer_t *timer)
914 {
915 if (timer) {
916 return timer->period_ms;
917 }
918 return 0;
919 }
920
921 mainloop_io_t *
922 mainloop_add_ipc_client(const char *name, int priority, size_t max_size,
923 void *userdata, struct ipc_client_callbacks *callbacks)
924 {
925 crm_ipc_t *ipc = crm_ipc_new(name, 0);
926 mainloop_io_t *source = NULL;
927 int rc = pcmk__add_mainloop_ipc(ipc, priority, userdata, callbacks,
928 &source);
929
930 if (rc != pcmk_rc_ok) {
931 if (crm_log_level == LOG_STDOUT) {
932 fprintf(stderr, "Connection to %s failed: %s",
933 name, pcmk_rc_str(rc));
934 }
935 crm_ipc_destroy(ipc);
936 if (rc > 0) {
937 errno = rc;
938 } else {
939 errno = ENOTCONN;
940 }
941 return NULL;
942 }
943 return source;
944 }
945
946 void
947 mainloop_del_ipc_client(mainloop_io_t * client)
948 {
949 mainloop_del_fd(client);
950 }
951
952 crm_ipc_t *
953 mainloop_get_ipc_client(mainloop_io_t * client)
954 {
955 if (client) {
956 return client->ipc;
957 }
958 return NULL;
959 }
960
961 mainloop_io_t *
962 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
963 struct mainloop_fd_callbacks * callbacks)
964 {
965 mainloop_io_t *client = NULL;
966
967 if (fd >= 0) {
968 client = calloc(1, sizeof(mainloop_io_t));
969 if (client == NULL) {
970 return NULL;
971 }
972 client->name = strdup(name);
973 client->userdata = userdata;
974
975 if (callbacks) {
976 client->destroy_fn = callbacks->destroy;
977 client->dispatch_fn_io = callbacks->dispatch;
978 }
979
980 client->fd = fd;
981 client->channel = g_io_channel_unix_new(fd);
982 client->source =
983 g_io_add_watch_full(client->channel, priority,
984 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
985 client, mainloop_gio_destroy);
986
987
988
989
990
991
992
993
994
995
996 g_io_channel_unref(client->channel);
997 crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
998 } else {
999 errno = EINVAL;
1000 }
1001
1002 return client;
1003 }
1004
1005 void
1006 mainloop_del_fd(mainloop_io_t * client)
1007 {
1008 if (client != NULL) {
1009 crm_trace("Removing client %s[%p]", client->name, client);
1010 if (client->source) {
1011
1012
1013
1014 g_source_remove(client->source);
1015 }
1016 }
1017 }
1018
1019 static GList *child_list = NULL;
1020
1021 pid_t
1022 mainloop_child_pid(mainloop_child_t * child)
1023 {
1024 return child->pid;
1025 }
1026
1027 const char *
1028 mainloop_child_name(mainloop_child_t * child)
1029 {
1030 return child->desc;
1031 }
1032
1033 int
1034 mainloop_child_timeout(mainloop_child_t * child)
1035 {
1036 return child->timeout;
1037 }
1038
1039 void *
1040 mainloop_child_userdata(mainloop_child_t * child)
1041 {
1042 return child->privatedata;
1043 }
1044
1045 void
1046 mainloop_clear_child_userdata(mainloop_child_t * child)
1047 {
1048 child->privatedata = NULL;
1049 }
1050
1051
1052 static void
1053 child_free(mainloop_child_t *child)
1054 {
1055 if (child->timerid != 0) {
1056 crm_trace("Removing timer %d", child->timerid);
1057 g_source_remove(child->timerid);
1058 child->timerid = 0;
1059 }
1060 free(child->desc);
1061 free(child);
1062 }
1063
1064
1065 static int
1066 child_kill_helper(mainloop_child_t *child)
1067 {
1068 int rc;
1069 if (child->flags & mainloop_leave_pid_group) {
1070 crm_debug("Kill pid %d only. leave group intact.", child->pid);
1071 rc = kill(child->pid, SIGKILL);
1072 } else {
1073 crm_debug("Kill pid %d's group", child->pid);
1074 rc = kill(-child->pid, SIGKILL);
1075 }
1076
1077 if (rc < 0) {
1078 if (errno != ESRCH) {
1079 crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
1080 }
1081 return -errno;
1082 }
1083 return 0;
1084 }
1085
1086 static gboolean
1087 child_timeout_callback(gpointer p)
1088 {
1089 mainloop_child_t *child = p;
1090 int rc = 0;
1091
1092 child->timerid = 0;
1093 if (child->timeout) {
1094 crm_warn("%s process (PID %d) will not die!", child->desc, (int)child->pid);
1095 return FALSE;
1096 }
1097
1098 rc = child_kill_helper(child);
1099 if (rc == -ESRCH) {
1100
1101 return FALSE;
1102 }
1103
1104 child->timeout = TRUE;
1105 crm_debug("%s process (PID %d) timed out", child->desc, (int)child->pid);
1106
1107 child->timerid = pcmk__create_timer(5000, child_timeout_callback, child);
1108 return FALSE;
1109 }
1110
1111 static bool
1112 child_waitpid(mainloop_child_t *child, int flags)
1113 {
1114 int rc = 0;
1115 int core = 0;
1116 int signo = 0;
1117 int status = 0;
1118 int exitcode = 0;
1119 bool callback_needed = true;
1120
1121 rc = waitpid(child->pid, &status, flags);
1122 if (rc == 0) {
1123 crm_trace("Child process %d (%s) still active",
1124 child->pid, child->desc);
1125 callback_needed = false;
1126
1127 } else if (rc != child->pid) {
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137 signo = SIGCHLD;
1138 exitcode = 1;
1139 crm_notice("Wait for child process %d (%s) interrupted: %s",
1140 child->pid, child->desc, pcmk_rc_str(errno));
1141
1142 } else if (WIFEXITED(status)) {
1143 exitcode = WEXITSTATUS(status);
1144 crm_trace("Child process %d (%s) exited with status %d",
1145 child->pid, child->desc, exitcode);
1146
1147 } else if (WIFSIGNALED(status)) {
1148 signo = WTERMSIG(status);
1149 crm_trace("Child process %d (%s) exited with signal %d (%s)",
1150 child->pid, child->desc, signo, strsignal(signo));
1151
1152 #ifdef WCOREDUMP
1153 } else if (WCOREDUMP(status)) {
1154 core = 1;
1155 crm_err("Child process %d (%s) dumped core",
1156 child->pid, child->desc);
1157 #endif
1158
1159 } else {
1160 crm_trace("Child process %d (%s) stopped or continued",
1161 child->pid, child->desc);
1162 callback_needed = false;
1163 }
1164
1165 if (callback_needed && child->callback) {
1166 child->callback(child, child->pid, core, signo, exitcode);
1167 }
1168 return callback_needed;
1169 }
1170
1171 static void
1172 child_death_dispatch(int signal)
1173 {
1174 for (GList *iter = child_list; iter; ) {
1175 GList *saved = iter;
1176 mainloop_child_t *child = iter->data;
1177
1178 iter = iter->next;
1179 if (child_waitpid(child, WNOHANG)) {
1180 crm_trace("Removing completed process %d from child list",
1181 child->pid);
1182 child_list = g_list_remove_link(child_list, saved);
1183 g_list_free(saved);
1184 child_free(child);
1185 }
1186 }
1187 }
1188
1189 static gboolean
1190 child_signal_init(gpointer p)
1191 {
1192 crm_trace("Installed SIGCHLD handler");
1193
1194 mainloop_add_signal(SIGCHLD, child_death_dispatch);
1195
1196
1197 child_death_dispatch(SIGCHLD);
1198 return FALSE;
1199 }
1200
1201 gboolean
1202 mainloop_child_kill(pid_t pid)
1203 {
1204 GList *iter;
1205 mainloop_child_t *child = NULL;
1206 mainloop_child_t *match = NULL;
1207
1208
1209 int waitflags = 0, rc = 0;
1210
1211 for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1212 child = iter->data;
1213 if (pid == child->pid) {
1214 match = child;
1215 }
1216 }
1217
1218 if (match == NULL) {
1219 return FALSE;
1220 }
1221
1222 rc = child_kill_helper(match);
1223 if(rc == -ESRCH) {
1224
1225
1226
1227
1228
1229 crm_trace("Waiting for signal that child process %d completed",
1230 match->pid);
1231 return TRUE;
1232
1233 } else if(rc != 0) {
1234
1235
1236
1237 waitflags = WNOHANG;
1238 }
1239
1240 if (!child_waitpid(match, waitflags)) {
1241
1242 return FALSE;
1243 }
1244
1245 child_list = g_list_remove(child_list, match);
1246 child_free(match);
1247 return TRUE;
1248 }
1249
1250
1251
1252
1253
1254
1255
1256
1257 void
1258 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1259 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1260 {
1261 static bool need_init = TRUE;
1262 mainloop_child_t *child = pcmk__assert_alloc(1, sizeof(mainloop_child_t));
1263
1264 child->pid = pid;
1265 child->timerid = 0;
1266 child->timeout = FALSE;
1267 child->privatedata = privatedata;
1268 child->callback = callback;
1269 child->flags = flags;
1270 child->desc = pcmk__str_copy(desc);
1271
1272 if (timeout) {
1273 child->timerid = pcmk__create_timer(timeout, child_timeout_callback, child);
1274 }
1275
1276 child_list = g_list_append(child_list, child);
1277
1278 if(need_init) {
1279 need_init = FALSE;
1280
1281
1282
1283
1284 pcmk__create_timer(1, child_signal_init, NULL);
1285 }
1286 }
1287
1288 void
1289 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1290 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1291 {
1292 mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1293 }
1294
1295 static gboolean
1296 mainloop_timer_cb(gpointer user_data)
1297 {
1298 int id = 0;
1299 bool repeat = FALSE;
1300 struct mainloop_timer_s *t = user_data;
1301
1302 pcmk__assert(t != NULL);
1303
1304 id = t->id;
1305 t->id = 0;
1306
1307
1308
1309 if(t->cb) {
1310 crm_trace("Invoking callbacks for timer %s", t->name);
1311 repeat = t->repeat;
1312 if(t->cb(t->userdata) == FALSE) {
1313 crm_trace("Timer %s complete", t->name);
1314 repeat = FALSE;
1315 }
1316 }
1317
1318 if(repeat) {
1319
1320 t->id = id;
1321 }
1322
1323 return repeat;
1324 }
1325
1326 bool
1327 mainloop_timer_running(mainloop_timer_t *t)
1328 {
1329 if(t && t->id != 0) {
1330 return TRUE;
1331 }
1332 return FALSE;
1333 }
1334
1335 void
1336 mainloop_timer_start(mainloop_timer_t *t)
1337 {
1338 mainloop_timer_stop(t);
1339 if(t && t->period_ms > 0) {
1340 crm_trace("Starting timer %s", t->name);
1341 t->id = pcmk__create_timer(t->period_ms, mainloop_timer_cb, t);
1342 }
1343 }
1344
1345 void
1346 mainloop_timer_stop(mainloop_timer_t *t)
1347 {
1348 if(t && t->id != 0) {
1349 crm_trace("Stopping timer %s", t->name);
1350 g_source_remove(t->id);
1351 t->id = 0;
1352 }
1353 }
1354
1355 guint
1356 mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
1357 {
1358 guint last = 0;
1359
1360 if(t) {
1361 last = t->period_ms;
1362 t->period_ms = period_ms;
1363 }
1364
1365 if(t && t->id != 0 && last != t->period_ms) {
1366 mainloop_timer_start(t);
1367 }
1368 return last;
1369 }
1370
1371 mainloop_timer_t *
1372 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1373 {
1374 mainloop_timer_t *t = pcmk__assert_alloc(1, sizeof(mainloop_timer_t));
1375
1376 if (name != NULL) {
1377 t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1378 } else {
1379 t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1380 }
1381 t->id = 0;
1382 t->period_ms = period_ms;
1383 t->repeat = repeat;
1384 t->cb = cb;
1385 t->userdata = userdata;
1386 crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1387 return t;
1388 }
1389
1390 void
1391 mainloop_timer_del(mainloop_timer_t *t)
1392 {
1393 if(t) {
1394 crm_trace("Destroying timer %s", t->name);
1395 mainloop_timer_stop(t);
1396 free(t->name);
1397 free(t);
1398 }
1399 }
1400
1401
1402
1403
1404
1405 static gboolean
1406 drain_timeout_cb(gpointer user_data)
1407 {
1408 bool *timeout_popped = (bool*) user_data;
1409
1410 *timeout_popped = TRUE;
1411 return FALSE;
1412 }
1413
1414
1415
1416
1417
1418
1419
1420 void
1421 pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
1422 {
1423 if ((mloop != NULL) && g_main_loop_is_running(mloop)) {
1424 GMainContext *ctx = g_main_loop_get_context(mloop);
1425
1426
1427
1428
1429 for (int i = 0; (i < n) && g_main_context_pending(ctx); ++i) {
1430 g_main_context_dispatch(ctx);
1431 }
1432 g_main_loop_quit(mloop);
1433 }
1434 }
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449 void
1450 pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
1451 {
1452 bool timeout_popped = FALSE;
1453 guint timer = 0;
1454 GMainContext *ctx = NULL;
1455
1456 CRM_CHECK(mloop && check, return);
1457
1458 ctx = g_main_loop_get_context(mloop);
1459 if (ctx) {
1460 time_t start_time = time(NULL);
1461
1462 timer = pcmk__create_timer(timer_ms, drain_timeout_cb, &timeout_popped);
1463 while (!timeout_popped
1464 && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1465 g_main_context_iteration(ctx, TRUE);
1466 }
1467 }
1468 if (!timeout_popped && (timer > 0)) {
1469 g_source_remove(timer);
1470 }
1471 }