pacemaker  2.0.2-debe490
Scalable High-Availability cluster resource manager
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
mainloop.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2019 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 
12 #ifndef _GNU_SOURCE
13 # define _GNU_SOURCE
14 #endif
15 
16 #include <stdlib.h>
17 #include <signal.h>
18 #include <errno.h>
19 
20 #include <sys/wait.h>
21 
22 #include <crm/crm.h>
23 #include <crm/common/xml.h>
24 #include <crm/common/mainloop.h>
25 #include <crm/common/ipcs.h>
26 
27 #include <qb/qbarray.h>
28 
29 struct mainloop_child_s {
30  pid_t pid;
31  char *desc;
32  unsigned timerid;
33  gboolean timeout;
34  void *privatedata;
35 
37 
38  /* Called when a process dies */
39  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
40 };
41 
42 struct trigger_s {
43  GSource source;
44  gboolean running;
45  gboolean trigger;
46  void *user_data;
47  guint id;
48 
49 };
50 
51 static gboolean
52 crm_trigger_prepare(GSource * source, gint * timeout)
53 {
54  crm_trigger_t *trig = (crm_trigger_t *) source;
55 
56  /* cluster-glue's FD and IPC related sources make use of
57  * g_source_add_poll() but do not set a timeout in their prepare
58  * functions
59  *
60  * This means mainloop's poll() will block until an event for one
61  * of these sources occurs - any /other/ type of source, such as
62  * this one or g_idle_*, that doesn't use g_source_add_poll() is
63  * S-O-L and won't be processed until there is something fd-based
64  * happens.
65  *
66  * Luckily the timeout we can set here affects all sources and
67  * puts an upper limit on how long poll() can take.
68  *
69  * So unconditionally set a small-ish timeout, not too small that
70  * we're in constant motion, which will act as an upper bound on
71  * how long the signal handling might be delayed for.
72  */
73  *timeout = 500; /* Timeout in ms */
74 
75  return trig->trigger;
76 }
77 
78 static gboolean
79 crm_trigger_check(GSource * source)
80 {
81  crm_trigger_t *trig = (crm_trigger_t *) source;
82 
83  return trig->trigger;
84 }
85 
86 static gboolean
87 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
88 {
89  int rc = TRUE;
90  crm_trigger_t *trig = (crm_trigger_t *) source;
91 
92  if (trig->running) {
93  /* Wait until the existing job is complete before starting the next one */
94  return TRUE;
95  }
96  trig->trigger = FALSE;
97 
98  if (callback) {
99  rc = callback(trig->user_data);
100  if (rc < 0) {
101  crm_trace("Trigger handler %p not yet complete", trig);
102  trig->running = TRUE;
103  rc = TRUE;
104  }
105  }
106  return rc;
107 }
108 
109 static void
110 crm_trigger_finalize(GSource * source)
111 {
112  crm_trace("Trigger %p destroyed", source);
113 }
114 
115 #if 0
116 struct _GSourceCopy
117 {
118  gpointer callback_data;
119  GSourceCallbackFuncs *callback_funcs;
120 
121  const GSourceFuncs *source_funcs;
122  guint ref_count;
123 
124  GMainContext *context;
125 
126  gint priority;
127  guint flags;
128  guint source_id;
129 
130  GSList *poll_fds;
131 
132  GSource *prev;
133  GSource *next;
134 
135  char *name;
136 
137  void *priv;
138 };
139 
140 static int
141 g_source_refcount(GSource * source)
142 {
143  /* Duplicating the contents of private header files is a necessary evil */
144  if (source) {
145  struct _GSourceCopy *evil = (struct _GSourceCopy*)source;
146  return evil->ref_count;
147  }
148  return 0;
149 }
150 #else
151 static int g_source_refcount(GSource * source)
152 {
153  return 0;
154 }
155 #endif
156 
157 static GSourceFuncs crm_trigger_funcs = {
158  crm_trigger_prepare,
159  crm_trigger_check,
160  crm_trigger_dispatch,
161  crm_trigger_finalize,
162 };
163 
164 static crm_trigger_t *
165 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
166  gpointer userdata)
167 {
168  crm_trigger_t *trigger = NULL;
169 
170  trigger = (crm_trigger_t *) source;
171 
172  trigger->id = 0;
173  trigger->trigger = FALSE;
174  trigger->user_data = userdata;
175 
176  if (dispatch) {
177  g_source_set_callback(source, dispatch, trigger, NULL);
178  }
179 
180  g_source_set_priority(source, priority);
181  g_source_set_can_recurse(source, FALSE);
182 
183  crm_trace("Setup %p with ref-count=%u", source, g_source_refcount(source));
184  trigger->id = g_source_attach(source, NULL);
185  crm_trace("Attached %p with ref-count=%u", source, g_source_refcount(source));
186 
187  return trigger;
188 }
189 
190 void
192 {
193  crm_trace("Trigger handler %p complete", trig);
194  trig->running = FALSE;
195 }
196 
197 /* If dispatch returns:
198  * -1: Job running but not complete
199  * 0: Remove the trigger from mainloop
200  * 1: Leave the trigger in mainloop
201  */
203 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
204 {
205  GSource *source = NULL;
206 
207  CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
208  source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
209  CRM_ASSERT(source != NULL);
210 
211  return mainloop_setup_trigger(source, priority, dispatch, userdata);
212 }
213 
214 void
216 {
217  if(source) {
218  source->trigger = TRUE;
219  }
220 }
221 
222 gboolean
224 {
225  GSource *gs = NULL;
226 
227  if(source == NULL) {
228  return TRUE;
229  }
230 
231  gs = (GSource *)source;
232 
233  if(g_source_refcount(gs) > 2) {
234  crm_info("Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
235  }
236 
237  g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
238  g_source_unref(gs); /* The caller no longer carries a reference to source
239  *
240  * At this point the source should be free'd,
241  * unless we're currently processing said
242  * source, in which case mainloop holds an
243  * additional reference and it will be free'd
244  * once our processing completes
245  */
246  return TRUE;
247 }
248 
249 // Define a custom glib source for signal handling
250 
251 // Data structure for custom glib source
252 typedef struct signal_s {
253  crm_trigger_t trigger; // trigger that invoked source (must be first)
254  void (*handler) (int sig); // signal handler
255  int signal; // signal that was received
256 } crm_signal_t;
257 
258 // Table to associate signal handlers with signal numbers
259 static crm_signal_t *crm_signals[NSIG];
260 
272 static gboolean
273 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
274 {
275  crm_signal_t *sig = (crm_signal_t *) source;
276 
277  if(sig->signal != SIGCHLD) {
278  crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
279  strsignal(sig->signal), sig->signal,
280  (sig->handler? "invoking" : "no"));
281  }
282 
283  sig->trigger.trigger = FALSE;
284  if (sig->handler) {
285  sig->handler(sig->signal);
286  }
287  return TRUE;
288 }
289 
299 static void
300 mainloop_signal_handler(int sig)
301 {
302  if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
303  mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
304  }
305 }
306 
307 // Functions implementing our custom glib source for signal handling
308 static GSourceFuncs crm_signal_funcs = {
309  crm_trigger_prepare,
310  crm_trigger_check,
311  crm_signal_dispatch,
312  crm_trigger_finalize,
313 };
314 
329 {
330  sigset_t mask;
331  struct sigaction sa;
332  struct sigaction old;
333 
334  if (sigemptyset(&mask) < 0) {
335  crm_err("Could not set handler for signal %d: %s",
336  sig, pcmk_strerror(errno));
337  return SIG_ERR;
338  }
339 
340  memset(&sa, 0, sizeof(struct sigaction));
341  sa.sa_handler = dispatch;
342  sa.sa_flags = SA_RESTART;
343  sa.sa_mask = mask;
344 
345  if (sigaction(sig, &sa, &old) < 0) {
346  crm_err("Could not set handler for signal %d: %s",
347  sig, pcmk_strerror(errno));
348  return SIG_ERR;
349  }
350  return old.sa_handler;
351 }
352 
353 /*
354  * \brief Use crm_signal_handler() instead
355  * \deprecated
356  */
357 gboolean
358 crm_signal(int sig, void (*dispatch) (int sig))
359 {
360  return crm_signal_handler(sig, dispatch) != SIG_ERR;
361 }
362 
363 static void
364 mainloop_destroy_signal_entry(int sig)
365 {
366  crm_signal_t *tmp = crm_signals[sig];
367 
368  crm_signals[sig] = NULL;
369 
370  crm_trace("Destroying signal %d", sig);
372 }
373 
385 gboolean
386 mainloop_add_signal(int sig, void (*dispatch) (int sig))
387 {
388  GSource *source = NULL;
389  int priority = G_PRIORITY_HIGH - 1;
390 
391  if (sig == SIGTERM) {
392  /* TERM is higher priority than other signals,
393  * signals are higher priority than other ipc.
394  * Yes, minus: smaller is "higher"
395  */
396  priority--;
397  }
398 
399  if (sig >= NSIG || sig < 0) {
400  crm_err("Signal %d is out of range", sig);
401  return FALSE;
402 
403  } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
404  crm_trace("Signal handler for %d is already installed", sig);
405  return TRUE;
406 
407  } else if (crm_signals[sig] != NULL) {
408  crm_err("Different signal handler for %d is already installed", sig);
409  return FALSE;
410  }
411 
412  CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
413  source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
414 
415  crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
416  CRM_ASSERT(crm_signals[sig] != NULL);
417 
418  crm_signals[sig]->handler = dispatch;
419  crm_signals[sig]->signal = sig;
420 
421  if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) {
422  mainloop_destroy_signal_entry(sig);
423  return FALSE;
424  }
425 #if 0
426  /* If we want signals to interrupt mainloop's poll(), instead of waiting for
427  * the timeout, then we should call siginterrupt() below
428  *
429  * For now, just enforce a low timeout
430  */
431  if (siginterrupt(sig, 1) < 0) {
432  crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
433  }
434 #endif
435 
436  return TRUE;
437 }
438 
439 gboolean
441 {
442  if (sig >= NSIG || sig < 0) {
443  crm_err("Signal %d is out of range", sig);
444  return FALSE;
445 
446  } else if (crm_signal_handler(sig, NULL) == SIG_ERR) {
447  crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
448  return FALSE;
449 
450  } else if (crm_signals[sig] == NULL) {
451  return TRUE;
452  }
453  mainloop_destroy_signal_entry(sig);
454  return TRUE;
455 }
456 
457 static qb_array_t *gio_map = NULL;
458 
459 void
461 {
462  if (gio_map) {
463  qb_array_free(gio_map);
464  }
465 
466  for (int sig = 0; sig < NSIG; ++sig) {
467  mainloop_destroy_signal_entry(sig);
468  }
469 }
470 
471 /*
472  * libqb...
473  */
474 struct gio_to_qb_poll {
475  int32_t is_used;
476  guint source;
477  int32_t events;
478  void *data;
479  qb_ipcs_dispatch_fn_t fn;
480  enum qb_loop_priority p;
481 };
482 
483 static gboolean
484 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
485 {
486  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
487  gint fd = g_io_channel_unix_get_fd(gio);
488 
489  crm_trace("%p.%d %d", data, fd, condition);
490 
491  /* if this assert get's hit, then there is a race condition between
492  * when we destroy a fd and when mainloop actually gives it up */
493  CRM_ASSERT(adaptor->is_used > 0);
494 
495  return (adaptor->fn(fd, condition, adaptor->data) == 0);
496 }
497 
498 static void
499 gio_poll_destroy(gpointer data)
500 {
501  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
502 
503  adaptor->is_used--;
504  CRM_ASSERT(adaptor->is_used >= 0);
505 
506  if (adaptor->is_used == 0) {
507  crm_trace("Marking adaptor %p unused", adaptor);
508  adaptor->source = 0;
509  }
510 }
511 
512 static int32_t
513 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
514  void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
515 {
516  struct gio_to_qb_poll *adaptor;
517  GIOChannel *channel;
518  int32_t res = 0;
519 
520  res = qb_array_index(gio_map, fd, (void **)&adaptor);
521  if (res < 0) {
522  crm_err("Array lookup failed for fd=%d: %d", fd, res);
523  return res;
524  }
525 
526  crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
527 
528  if (add && adaptor->source) {
529  crm_err("Adaptor for descriptor %d is still in-use", fd);
530  return -EEXIST;
531  }
532  if (!add && !adaptor->is_used) {
533  crm_err("Adaptor for descriptor %d is not in-use", fd);
534  return -ENOENT;
535  }
536 
537  /* channel is created with ref_count = 1 */
538  channel = g_io_channel_unix_new(fd);
539  if (!channel) {
540  crm_err("No memory left to add fd=%d", fd);
541  return -ENOMEM;
542  }
543 
544  if (adaptor->source) {
545  g_source_remove(adaptor->source);
546  adaptor->source = 0;
547  }
548 
549  /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
550  evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
551 
552  adaptor->fn = fn;
553  adaptor->events = evts;
554  adaptor->data = data;
555  adaptor->p = p;
556  adaptor->is_used++;
557  adaptor->source =
558  g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor,
559  gio_poll_destroy);
560 
561  /* Now that mainloop now holds a reference to channel,
562  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
563  *
564  * This means that channel will be free'd by:
565  * g_main_context_dispatch()
566  * -> g_source_destroy_internal()
567  * -> g_source_callback_unref()
568  * shortly after gio_poll_destroy() completes
569  */
570  g_io_channel_unref(channel);
571 
572  crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
573  if (adaptor->source > 0) {
574  return 0;
575  }
576 
577  return -EINVAL;
578 }
579 
580 static int32_t
581 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
582  void *data, qb_ipcs_dispatch_fn_t fn)
583 {
584  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
585 }
586 
587 static int32_t
588 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
589  void *data, qb_ipcs_dispatch_fn_t fn)
590 {
591  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
592 }
593 
594 static int32_t
595 gio_poll_dispatch_del(int32_t fd)
596 {
597  struct gio_to_qb_poll *adaptor;
598 
599  crm_trace("Looking for fd=%d", fd);
600  if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
601  if (adaptor->source) {
602  g_source_remove(adaptor->source);
603  adaptor->source = 0;
604  }
605  }
606  return 0;
607 }
608 
609 struct qb_ipcs_poll_handlers gio_poll_funcs = {
610  .job_add = NULL,
611  .dispatch_add = gio_poll_dispatch_add,
612  .dispatch_mod = gio_poll_dispatch_mod,
613  .dispatch_del = gio_poll_dispatch_del,
614 };
615 
616 static enum qb_ipc_type
617 pick_ipc_type(enum qb_ipc_type requested)
618 {
619  const char *env = getenv("PCMK_ipc_type");
620 
621  if (env && strcmp("shared-mem", env) == 0) {
622  return QB_IPC_SHM;
623  } else if (env && strcmp("socket", env) == 0) {
624  return QB_IPC_SOCKET;
625  } else if (env && strcmp("posix", env) == 0) {
626  return QB_IPC_POSIX_MQ;
627  } else if (env && strcmp("sysv", env) == 0) {
628  return QB_IPC_SYSV_MQ;
629  } else if (requested == QB_IPC_NATIVE) {
630  /* We prefer shared memory because the server never blocks on
631  * send. If part of a message fits into the socket, libqb
632  * needs to block until the remainder can be sent also.
633  * Otherwise the client will wait forever for the remaining
634  * bytes.
635  */
636  return QB_IPC_SHM;
637  }
638  return requested;
639 }
640 
641 qb_ipcs_service_t *
642 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
643  struct qb_ipcs_service_handlers * callbacks)
644 {
645  int rc = 0;
646  qb_ipcs_service_t *server = NULL;
647 
648  if (gio_map == NULL) {
649  gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
650  }
651 
652  crm_client_init();
653  server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
654 
655 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
656  /* All clients should use at least ipc_buffer_max as their buffer size */
657  qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
658 #endif
659 
660  qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
661 
662  rc = qb_ipcs_run(server);
663  if (rc < 0) {
664  crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
665  return NULL;
666  }
667 
668  return server;
669 }
670 
671 void
672 mainloop_del_ipc_server(qb_ipcs_service_t * server)
673 {
674  if (server) {
675  qb_ipcs_destroy(server);
676  }
677 }
678 
679 struct mainloop_io_s {
680  char *name;
681  void *userdata;
682 
683  int fd;
684  guint source;
685  crm_ipc_t *ipc;
686  GIOChannel *channel;
687 
688  int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
689  int (*dispatch_fn_io) (gpointer userdata);
690  void (*destroy_fn) (gpointer userdata);
691 
692 };
693 
694 static gboolean
695 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
696 {
697  gboolean keep = TRUE;
698  mainloop_io_t *client = data;
699 
700  CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
701 
702  if (condition & G_IO_IN) {
703  if (client->ipc) {
704  long rc = 0;
705  int max = 10;
706 
707  do {
708  rc = crm_ipc_read(client->ipc);
709  if (rc <= 0) {
710  crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
711  client->name, client, pcmk_strerror(rc), rc);
712 
713  } else if (client->dispatch_fn_ipc) {
714  const char *buffer = crm_ipc_buffer(client->ipc);
715 
716  crm_trace("New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
717  if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
718  crm_trace("Connection to %s no longer required", client->name);
719  keep = FALSE;
720  }
721  }
722 
723  } while (keep && rc > 0 && --max > 0);
724 
725  } else {
726  crm_trace("New message from %s[%p] %u", client->name, client, condition);
727  if (client->dispatch_fn_io) {
728  if (client->dispatch_fn_io(client->userdata) < 0) {
729  crm_trace("Connection to %s no longer required", client->name);
730  keep = FALSE;
731  }
732  }
733  }
734  }
735 
736  if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
737  crm_err("Connection to %s closed " CRM_XS "client=%p condition=%d",
738  client->name, client, condition);
739  keep = FALSE;
740 
741  } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
742  crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
743  client->name, client, condition);
744  keep = FALSE;
745 
746  } else if ((condition & G_IO_IN) == 0) {
747  /*
748  #define GLIB_SYSDEF_POLLIN =1
749  #define GLIB_SYSDEF_POLLPRI =2
750  #define GLIB_SYSDEF_POLLOUT =4
751  #define GLIB_SYSDEF_POLLERR =8
752  #define GLIB_SYSDEF_POLLHUP =16
753  #define GLIB_SYSDEF_POLLNVAL =32
754 
755  typedef enum
756  {
757  G_IO_IN GLIB_SYSDEF_POLLIN,
758  G_IO_OUT GLIB_SYSDEF_POLLOUT,
759  G_IO_PRI GLIB_SYSDEF_POLLPRI,
760  G_IO_ERR GLIB_SYSDEF_POLLERR,
761  G_IO_HUP GLIB_SYSDEF_POLLHUP,
762  G_IO_NVAL GLIB_SYSDEF_POLLNVAL
763  } GIOCondition;
764 
765  A bitwise combination representing a condition to watch for on an event source.
766 
767  G_IO_IN There is data to read.
768  G_IO_OUT Data can be written (without blocking).
769  G_IO_PRI There is urgent data to read.
770  G_IO_ERR Error condition.
771  G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
772  G_IO_NVAL Invalid request. The file descriptor is not open.
773  */
774  crm_err("Strange condition: %d", condition);
775  }
776 
777  /* keep == FALSE results in mainloop_gio_destroy() being called
778  * just before the source is removed from mainloop
779  */
780  return keep;
781 }
782 
783 static void
784 mainloop_gio_destroy(gpointer c)
785 {
786  mainloop_io_t *client = c;
787  char *c_name = strdup(client->name);
788 
789  /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
790  * client->channel will still have ref_count > 0... should be == 1
791  */
792  crm_trace("Destroying client %s[%p]", c_name, c);
793 
794  if (client->ipc) {
795  crm_ipc_close(client->ipc);
796  }
797 
798  if (client->destroy_fn) {
799  void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
800 
801  client->destroy_fn = NULL;
802  destroy_fn(client->userdata);
803  }
804 
805  if (client->ipc) {
806  crm_ipc_t *ipc = client->ipc;
807 
808  client->ipc = NULL;
809  crm_ipc_destroy(ipc);
810  }
811 
812  crm_trace("Destroyed client %s[%p]", c_name, c);
813 
814  free(client->name); client->name = NULL;
815  free(client);
816 
817  free(c_name);
818 }
819 
821 mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata,
822  struct ipc_client_callbacks *callbacks)
823 {
824  mainloop_io_t *client = NULL;
825  crm_ipc_t *conn = crm_ipc_new(name, max_size);
826 
827  if (conn && crm_ipc_connect(conn)) {
828  int32_t fd = crm_ipc_get_fd(conn);
829 
830  client = mainloop_add_fd(name, priority, fd, userdata, NULL);
831  }
832 
833  if (client == NULL) {
834  crm_perror(LOG_TRACE, "Connection to %s failed", name);
835  if (conn) {
836  crm_ipc_close(conn);
837  crm_ipc_destroy(conn);
838  }
839  return NULL;
840  }
841 
842  client->ipc = conn;
843  client->destroy_fn = callbacks->destroy;
844  client->dispatch_fn_ipc = callbacks->dispatch;
845  return client;
846 }
847 
848 void
850 {
851  mainloop_del_fd(client);
852 }
853 
854 crm_ipc_t *
856 {
857  if (client) {
858  return client->ipc;
859  }
860  return NULL;
861 }
862 
864 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
865  struct mainloop_fd_callbacks * callbacks)
866 {
867  mainloop_io_t *client = NULL;
868 
869  if (fd >= 0) {
870  client = calloc(1, sizeof(mainloop_io_t));
871  if (client == NULL) {
872  return NULL;
873  }
874  client->name = strdup(name);
875  client->userdata = userdata;
876 
877  if (callbacks) {
878  client->destroy_fn = callbacks->destroy;
879  client->dispatch_fn_io = callbacks->dispatch;
880  }
881 
882  client->fd = fd;
883  client->channel = g_io_channel_unix_new(fd);
884  client->source =
885  g_io_add_watch_full(client->channel, priority,
886  (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
887  client, mainloop_gio_destroy);
888 
889  /* Now that mainloop now holds a reference to channel,
890  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
891  *
892  * This means that channel will be free'd by:
893  * g_main_context_dispatch() or g_source_remove()
894  * -> g_source_destroy_internal()
895  * -> g_source_callback_unref()
896  * shortly after mainloop_gio_destroy() completes
897  */
898  g_io_channel_unref(client->channel);
899  crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
900  } else {
901  errno = EINVAL;
902  }
903 
904  return client;
905 }
906 
907 void
909 {
910  if (client != NULL) {
911  crm_trace("Removing client %s[%p]", client->name, client);
912  if (client->source) {
913  /* Results in mainloop_gio_destroy() being called just
914  * before the source is removed from mainloop
915  */
916  g_source_remove(client->source);
917  }
918  }
919 }
920 
921 static GListPtr child_list = NULL;
922 
923 pid_t
925 {
926  return child->pid;
927 }
928 
929 const char *
931 {
932  return child->desc;
933 }
934 
935 int
937 {
938  return child->timeout;
939 }
940 
941 void *
943 {
944  return child->privatedata;
945 }
946 
947 void
949 {
950  child->privatedata = NULL;
951 }
952 
953 /* good function name */
954 static void
955 child_free(mainloop_child_t *child)
956 {
957  if (child->timerid != 0) {
958  crm_trace("Removing timer %d", child->timerid);
959  g_source_remove(child->timerid);
960  child->timerid = 0;
961  }
962  free(child->desc);
963  free(child);
964 }
965 
966 /* terrible function name */
967 static int
968 child_kill_helper(mainloop_child_t *child)
969 {
970  int rc;
971  if (child->flags & mainloop_leave_pid_group) {
972  crm_debug("Kill pid %d only. leave group intact.", child->pid);
973  rc = kill(child->pid, SIGKILL);
974  } else {
975  crm_debug("Kill pid %d's group", child->pid);
976  rc = kill(-child->pid, SIGKILL);
977  }
978 
979  if (rc < 0) {
980  if (errno != ESRCH) {
981  crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
982  }
983  return -errno;
984  }
985  return 0;
986 }
987 
988 static gboolean
989 child_timeout_callback(gpointer p)
990 {
991  mainloop_child_t *child = p;
992  int rc = 0;
993 
994  child->timerid = 0;
995  if (child->timeout) {
996  crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
997  return FALSE;
998  }
999 
1000  rc = child_kill_helper(child);
1001  if (rc == ESRCH) {
1002  /* Nothing left to do. pid doesn't exist */
1003  return FALSE;
1004  }
1005 
1006  child->timeout = TRUE;
1007  crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
1008 
1009  child->timerid = g_timeout_add(5000, child_timeout_callback, child);
1010  return FALSE;
1011 }
1012 
1013 static gboolean
1014 child_waitpid(mainloop_child_t *child, int flags)
1015 {
1016  int rc = 0;
1017  int core = 0;
1018  int signo = 0;
1019  int status = 0;
1020  int exitcode = 0;
1021 
1022  rc = waitpid(child->pid, &status, flags);
1023  if(rc == 0) {
1024  crm_perror(LOG_DEBUG, "wait(%d) = %d", child->pid, rc);
1025  return FALSE;
1026 
1027  } else if(rc != child->pid) {
1028  signo = SIGCHLD;
1029  exitcode = 1;
1030  status = 1;
1031  crm_perror(LOG_ERR, "Call to waitpid(%d) failed", child->pid);
1032 
1033  } else {
1034  crm_trace("Managed process %d exited: %p", child->pid, child);
1035 
1036  if (WIFEXITED(status)) {
1037  exitcode = WEXITSTATUS(status);
1038  crm_trace("Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
1039 
1040  } else if (WIFSIGNALED(status)) {
1041  signo = WTERMSIG(status);
1042  crm_trace("Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
1043  }
1044 #ifdef WCOREDUMP
1045  if (WCOREDUMP(status)) {
1046  core = 1;
1047  crm_err("Managed process %d (%s) dumped core", child->pid, child->desc);
1048  }
1049 #endif
1050  }
1051 
1052  if (child->callback) {
1053  child->callback(child, child->pid, core, signo, exitcode);
1054  }
1055  return TRUE;
1056 }
1057 
1058 static void
1059 child_death_dispatch(int signal)
1060 {
1061  GListPtr iter = child_list;
1062  gboolean exited;
1063 
1064  while(iter) {
1065  GListPtr saved = NULL;
1066  mainloop_child_t *child = iter->data;
1067  exited = child_waitpid(child, WNOHANG);
1068 
1069  saved = iter;
1070  iter = iter->next;
1071 
1072  if (exited == FALSE) {
1073  continue;
1074  }
1075  crm_trace("Removing process entry %p for %d", child, child->pid);
1076 
1077  child_list = g_list_remove_link(child_list, saved);
1078  g_list_free(saved);
1079  child_free(child);
1080  }
1081 }
1082 
1083 static gboolean
1084 child_signal_init(gpointer p)
1085 {
1086  crm_trace("Installed SIGCHLD handler");
1087  /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1088  mainloop_add_signal(SIGCHLD, child_death_dispatch);
1089 
1090  /* In case they terminated before the signal handler was installed */
1091  child_death_dispatch(SIGCHLD);
1092  return FALSE;
1093 }
1094 
1095 int
1097 {
1098  GListPtr iter;
1099  mainloop_child_t *child = NULL;
1100  mainloop_child_t *match = NULL;
1101  /* It is impossible to block SIGKILL, this allows us to
1102  * call waitpid without WNOHANG flag.*/
1103  int waitflags = 0, rc = 0;
1104 
1105  for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1106  child = iter->data;
1107  if (pid == child->pid) {
1108  match = child;
1109  }
1110  }
1111 
1112  if (match == NULL) {
1113  return FALSE;
1114  }
1115 
1116  rc = child_kill_helper(match);
1117  if(rc == -ESRCH) {
1118  /* It's gone, but hasn't shown up in waitpid() yet
1119  *
1120  * Wait until we get SIGCHLD and let child_death_dispatch()
1121  * clean it up as normal (so we get the correct return
1122  * code/status)
1123  *
1124  * The blocking alternative would be to call:
1125  * child_waitpid(match, 0);
1126  */
1127  crm_trace("Waiting for child %d to be reaped by child_death_dispatch()", match->pid);
1128  return TRUE;
1129 
1130  } else if(rc != 0) {
1131  /* If KILL for some other reason set the WNOHANG flag since we
1132  * can't be certain what happened.
1133  */
1134  waitflags = WNOHANG;
1135  }
1136 
1137  if (child_waitpid(match, waitflags) == FALSE) {
1138  /* not much we can do if this occurs */
1139  return FALSE;
1140  }
1141 
1142  child_list = g_list_remove(child_list, match);
1143  child_free(match);
1144  return TRUE;
1145 }
1146 
1147 /* Create/Log a new tracked process
1148  * To track a process group, use -pid
1149  */
1150 void
1151 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1152  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1153 {
1154  static bool need_init = TRUE;
1155  mainloop_child_t *child = g_new(mainloop_child_t, 1);
1156 
1157  child->pid = pid;
1158  child->timerid = 0;
1159  child->timeout = FALSE;
1160  child->privatedata = privatedata;
1161  child->callback = callback;
1162  child->flags = flags;
1163 
1164  if(desc) {
1165  child->desc = strdup(desc);
1166  }
1167 
1168  if (timeout) {
1169  child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1170  }
1171 
1172  child_list = g_list_append(child_list, child);
1173 
1174  if(need_init) {
1175  need_init = FALSE;
1176  /* SIGCHLD processing has to be invoked from mainloop.
1177  * We do not want it to be possible to both add a child pid
1178  * to mainloop, and have the pid's exit callback invoked within
1179  * the same callstack. */
1180  g_timeout_add(1, child_signal_init, NULL);
1181  }
1182 }
1183 
1184 void
1185 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1186  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1187 {
1188  mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1189 }
1190 
1191 struct mainloop_timer_s {
1192  guint id;
1193  guint period_ms;
1194  bool repeat;
1195  char *name;
1196  GSourceFunc cb;
1197  void *userdata;
1198 };
1199 
1200 static gboolean mainloop_timer_cb(gpointer user_data)
1201 {
1202  int id = 0;
1203  bool repeat = FALSE;
1204  struct mainloop_timer_s *t = user_data;
1205 
1206  CRM_ASSERT(t != NULL);
1207 
1208  id = t->id;
1209  t->id = 0; /* Ensure it's unset during callbacks so that
1210  * mainloop_timer_running() works as expected
1211  */
1212 
1213  if(t->cb) {
1214  crm_trace("Invoking callbacks for timer %s", t->name);
1215  repeat = t->repeat;
1216  if(t->cb(t->userdata) == FALSE) {
1217  crm_trace("Timer %s complete", t->name);
1218  repeat = FALSE;
1219  }
1220  }
1221 
1222  if(repeat) {
1223  /* Restore if repeating */
1224  t->id = id;
1225  }
1226 
1227  return repeat;
1228 }
1229 
1231 {
1232  if(t && t->id != 0) {
1233  return TRUE;
1234  }
1235  return FALSE;
1236 }
1237 
1239 {
1241  if(t && t->period_ms > 0) {
1242  crm_trace("Starting timer %s", t->name);
1243  t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1244  }
1245 }
1246 
1248 {
1249  if(t && t->id != 0) {
1250  crm_trace("Stopping timer %s", t->name);
1251  g_source_remove(t->id);
1252  t->id = 0;
1253  }
1254 }
1255 
1257 {
1258  guint last = 0;
1259 
1260  if(t) {
1261  last = t->period_ms;
1262  t->period_ms = period_ms;
1263  }
1264 
1265  if(t && t->id != 0 && last != t->period_ms) {
1267  }
1268  return last;
1269 }
1270 
1271 
1273 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1274 {
1275  mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
1276 
1277  if(t) {
1278  if(name) {
1279  t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1280  } else {
1281  t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1282  }
1283  t->id = 0;
1284  t->period_ms = period_ms;
1285  t->repeat = repeat;
1286  t->cb = cb;
1287  t->userdata = userdata;
1288  crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1289  }
1290  return t;
1291 }
1292 
1293 void
1295 {
1296  if(t) {
1297  crm_trace("Destroying timer %s", t->name);
1299  free(t->name);
1300  free(t);
1301  }
1302 }
1303 
1304 /*
1305  * Helpers to make sure certain events aren't lost at shutdown
1306  */
1307 
1308 static gboolean
1309 drain_timeout_cb(gpointer user_data)
1310 {
1311  bool *timeout_popped = (bool*) user_data;
1312 
1313  *timeout_popped = TRUE;
1314  return FALSE;
1315 }
1316 
1329 void
1330 pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
1331 {
1332  bool timeout_popped = FALSE;
1333  guint timer = 0;
1334  GMainContext *ctx = NULL;
1335 
1336  CRM_CHECK(mloop && check, return);
1337 
1338  ctx = g_main_loop_get_context(mloop);
1339  if (ctx) {
1340  time_t start_time = time(NULL);
1341 
1342  timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped);
1343  while (!timeout_popped
1344  && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1345  g_main_context_iteration(ctx, TRUE);
1346  }
1347  }
1348  if (!timeout_popped && (timer > 0)) {
1349  g_source_remove(timer);
1350  }
1351 }
#define LOG_TRACE
Definition: logging.h:26
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:156
bool crm_ipc_connect(crm_ipc_t *client)
Establish an IPC connection to a Pacemaker component.
Definition: ipc.c:955
A dumping ground.
#define crm_notice(fmt, args...)
Definition: logging.h:242
const char * pcmk_strerror(int rc)
Definition: results.c:188
struct signal_s crm_signal_t
#define crm_crit(fmt, args...)
Definition: logging.h:239
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:386
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:864
void mainloop_timer_start(mainloop_timer_t *t)
Definition: mainloop.c:1238
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
Definition: mainloop.c:1256
mainloop_child_flags
Definition: mainloop.h:26
void mainloop_timer_del(mainloop_timer_t *t)
Definition: mainloop.c:1294
gboolean mainloop_child_kill(pid_t pid)
Definition: mainloop.c:1096
int crm_ipc_get_fd(crm_ipc_t *client)
Definition: ipc.c:1061
void(* sighandler_t)(int)
Definition: mainloop.h:48
void pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool(*check)(guint))
Process main loop events while a certain condition is met.
Definition: mainloop.c:1330
struct mainloop_timer_s mainloop_timer_t
Definition: mainloop.h:34
struct mainloop_io_s mainloop_io_t
Definition: mainloop.h:32
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *userdata, enum mainloop_child_flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1151
struct mainloop_child_s mainloop_child_t
Definition: mainloop.h:33
void mainloop_set_trigger(crm_trigger_t *source)
Definition: mainloop.c:215
void mainloop_cleanup(void)
Definition: mainloop.c:460
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:92
pid_t mainloop_child_pid(mainloop_child_t *child)
Definition: mainloop.c:924
long crm_ipc_read(crm_ipc_t *client)
Definition: ipc.c:1170
uint32_t pid
Definition: internal.h:83
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
Definition: mainloop.c:223
struct qb_ipcs_poll_handlers gio_poll_funcs
Definition: mainloop.c:609
void mainloop_timer_stop(mainloop_timer_t *t)
Definition: mainloop.c:1247
Wrappers for and extensions to glib mainloop.
void crm_client_init(void)
Definition: ipc.c:252
const char * crm_ipc_buffer(crm_ipc_t *client)
Definition: ipc.c:1217
struct trigger_s crm_trigger_t
Definition: mainloop.h:31
uint32_t id
Definition: internal.h:82
void(* destroy)(gpointer)
Definition: mainloop.h:76
#define crm_warn(fmt, args...)
Definition: logging.h:241
#define crm_debug(fmt, args...)
Definition: logging.h:245
void * mainloop_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:942
struct crm_ipc_s crm_ipc_t
Definition: ipc.h:58
void(* destroy)(gpointer userdata)
Definition: mainloop.h:93
gboolean crm_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:358
#define crm_trace(fmt, args...)
Definition: logging.h:246
sighandler_t crm_signal_handler(int sig, sighandler_t dispatch)
Definition: mainloop.c:328
Wrappers for and extensions to libxml2.
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
Definition: mainloop.c:203
void mainloop_del_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:849
unsigned int crm_ipc_default_buffer_size(void)
Definition: ipc.c:71
void crm_ipc_destroy(crm_ipc_t *client)
Definition: ipc.c:1038
bool mainloop_timer_running(mainloop_timer_t *t)
Definition: mainloop.c:1230
bool crm_ipc_connected(crm_ipc_t *client)
Definition: ipc.c:1075
#define CRM_XS
Definition: logging.h:34
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *userdata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1185
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:218
void mainloop_trigger_complete(crm_trigger_t *trig)
Definition: mainloop.c:191
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:855
#define crm_err(fmt, args...)
Definition: logging.h:240
#define CRM_ASSERT(expr)
Definition: results.h:42
void mainloop_clear_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:948
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
Definition: ipc.c:925
char data[0]
Definition: internal.h:92
void mainloop_del_fd(mainloop_io_t *client)
Definition: mainloop.c:908
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
Definition: mainloop.c:672
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
Definition: mainloop.c:1273
gboolean mainloop_destroy_signal(int sig)
Definition: mainloop.c:440
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
Definition: mainloop.c:821
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Definition: mainloop.c:642
void crm_ipc_close(crm_ipc_t *client)
Definition: ipc.c:1023
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
const char * mainloop_child_name(mainloop_child_t *child)
Definition: mainloop.c:930
GList * GListPtr
Definition: crm.h:192
#define crm_info(fmt, args...)
Definition: logging.h:243
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
Definition: mainloop.h:75
uint64_t flags
Definition: remote.c:148
enum crm_ais_msg_types type
Definition: internal.h:85
int mainloop_child_timeout(mainloop_child_t *child)
Definition: mainloop.c:936