pacemaker  1.1.18-7fdfbbe
Scalable High-Availability cluster resource manager
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
mainloop.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <crm_internal.h>
20 
21 #ifndef _GNU_SOURCE
22 # define _GNU_SOURCE
23 #endif
24 
25 #include <stdlib.h>
26 #include <signal.h>
27 #include <errno.h>
28 
29 #include <sys/wait.h>
30 
31 #include <crm/crm.h>
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
34 #include <crm/common/ipcs.h>
35 
36 #include <qb/qbarray.h>
37 
38 struct mainloop_child_s {
39  pid_t pid;
40  char *desc;
41  unsigned timerid;
42  unsigned watchid;
43  gboolean timeout;
44  void *privatedata;
45 
47 
48  /* Called when a process dies */
49  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
50 };
51 
52 struct trigger_s {
53  GSource source;
54  gboolean running;
55  gboolean trigger;
56  void *user_data;
57  guint id;
58 
59 };
60 
61 static gboolean
62 crm_trigger_prepare(GSource * source, gint * timeout)
63 {
64  crm_trigger_t *trig = (crm_trigger_t *) source;
65 
66  /* cluster-glue's FD and IPC related sources make use of
67  * g_source_add_poll() but do not set a timeout in their prepare
68  * functions
69  *
70  * This means mainloop's poll() will block until an event for one
71  * of these sources occurs - any /other/ type of source, such as
72  * this one or g_idle_*, that doesn't use g_source_add_poll() is
73  * S-O-L and won't be processed until there is something fd-based
74  * happens.
75  *
76  * Luckily the timeout we can set here affects all sources and
77  * puts an upper limit on how long poll() can take.
78  *
79  * So unconditionally set a small-ish timeout, not too small that
80  * we're in constant motion, which will act as an upper bound on
81  * how long the signal handling might be delayed for.
82  */
83  *timeout = 500; /* Timeout in ms */
84 
85  return trig->trigger;
86 }
87 
88 static gboolean
89 crm_trigger_check(GSource * source)
90 {
91  crm_trigger_t *trig = (crm_trigger_t *) source;
92 
93  return trig->trigger;
94 }
95 
96 static gboolean
97 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
98 {
99  int rc = TRUE;
100  crm_trigger_t *trig = (crm_trigger_t *) source;
101 
102  if (trig->running) {
103  /* Wait until the existing job is complete before starting the next one */
104  return TRUE;
105  }
106  trig->trigger = FALSE;
107 
108  if (callback) {
109  rc = callback(trig->user_data);
110  if (rc < 0) {
111  crm_trace("Trigger handler %p not yet complete", trig);
112  trig->running = TRUE;
113  rc = TRUE;
114  }
115  }
116  return rc;
117 }
118 
119 static void
120 crm_trigger_finalize(GSource * source)
121 {
122  crm_trace("Trigger %p destroyed", source);
123 }
124 
125 #if 0
126 struct _GSourceCopy
127 {
128  gpointer callback_data;
129  GSourceCallbackFuncs *callback_funcs;
130 
131  const GSourceFuncs *source_funcs;
132  guint ref_count;
133 
134  GMainContext *context;
135 
136  gint priority;
137  guint flags;
138  guint source_id;
139 
140  GSList *poll_fds;
141 
142  GSource *prev;
143  GSource *next;
144 
145  char *name;
146 
147  void *priv;
148 };
149 
150 static int
151 g_source_refcount(GSource * source)
152 {
153  /* Duplicating the contents of private header files is a necessary evil */
154  if (source) {
155  struct _GSourceCopy *evil = (struct _GSourceCopy*)source;
156  return evil->ref_count;
157  }
158  return 0;
159 }
160 #else
161 static int g_source_refcount(GSource * source)
162 {
163  return 0;
164 }
165 #endif
166 
167 static GSourceFuncs crm_trigger_funcs = {
168  crm_trigger_prepare,
169  crm_trigger_check,
170  crm_trigger_dispatch,
171  crm_trigger_finalize,
172 };
173 
174 static crm_trigger_t *
175 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
176  gpointer userdata)
177 {
178  crm_trigger_t *trigger = NULL;
179 
180  trigger = (crm_trigger_t *) source;
181 
182  trigger->id = 0;
183  trigger->trigger = FALSE;
184  trigger->user_data = userdata;
185 
186  if (dispatch) {
187  g_source_set_callback(source, dispatch, trigger, NULL);
188  }
189 
190  g_source_set_priority(source, priority);
191  g_source_set_can_recurse(source, FALSE);
192 
193  crm_trace("Setup %p with ref-count=%u", source, g_source_refcount(source));
194  trigger->id = g_source_attach(source, NULL);
195  crm_trace("Attached %p with ref-count=%u", source, g_source_refcount(source));
196 
197  return trigger;
198 }
199 
200 void
202 {
203  crm_trace("Trigger handler %p complete", trig);
204  trig->running = FALSE;
205 }
206 
207 /* If dispatch returns:
208  * -1: Job running but not complete
209  * 0: Remove the trigger from mainloop
210  * 1: Leave the trigger in mainloop
211  */
213 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
214 {
215  GSource *source = NULL;
216 
217  CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
218  source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
219  CRM_ASSERT(source != NULL);
220 
221  return mainloop_setup_trigger(source, priority, dispatch, userdata);
222 }
223 
224 void
226 {
227  if(source) {
228  source->trigger = TRUE;
229  }
230 }
231 
232 gboolean
234 {
235  GSource *gs = NULL;
236 
237  if(source == NULL) {
238  return TRUE;
239  }
240 
241  gs = (GSource *)source;
242 
243  if(g_source_refcount(gs) > 2) {
244  crm_info("Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
245  }
246 
247  g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
248  g_source_unref(gs); /* The caller no longer carries a reference to source
249  *
250  * At this point the source should be free'd,
251  * unless we're currently processing said
252  * source, in which case mainloop holds an
253  * additional reference and it will be free'd
254  * once our processing completes
255  */
256  return TRUE;
257 }
258 
259 typedef struct signal_s {
260  crm_trigger_t trigger; /* must be first */
261  void (*handler) (int sig);
262  int signal;
263 
264 } crm_signal_t;
265 
266 static crm_signal_t *crm_signals[NSIG];
267 
268 static gboolean
269 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
270 {
271  crm_signal_t *sig = (crm_signal_t *) source;
272 
273  if(sig->signal != SIGCHLD) {
274  crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
275  strsignal(sig->signal), sig->signal,
276  (sig->handler? "invoking" : "no"));
277  }
278 
279  sig->trigger.trigger = FALSE;
280  if (sig->handler) {
281  sig->handler(sig->signal);
282  }
283  return TRUE;
284 }
285 
286 static void
287 mainloop_signal_handler(int sig)
288 {
289  if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
290  mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
291  }
292 }
293 
294 static GSourceFuncs crm_signal_funcs = {
295  crm_trigger_prepare,
296  crm_trigger_check,
297  crm_signal_dispatch,
298  crm_trigger_finalize,
299 };
300 
301 gboolean
302 crm_signal(int sig, void (*dispatch) (int sig))
303 {
304  sigset_t mask;
305  struct sigaction sa;
306  struct sigaction old;
307 
308  if (sigemptyset(&mask) < 0) {
309  crm_perror(LOG_ERR, "Call to sigemptyset failed");
310  return FALSE;
311  }
312 
313  memset(&sa, 0, sizeof(struct sigaction));
314  sa.sa_handler = dispatch;
315  sa.sa_flags = SA_RESTART;
316  sa.sa_mask = mask;
317 
318  if (sigaction(sig, &sa, &old) < 0) {
319  crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig);
320  return FALSE;
321  }
322 
323  return TRUE;
324 }
325 
326 gboolean
327 mainloop_add_signal(int sig, void (*dispatch) (int sig))
328 {
329  GSource *source = NULL;
330  int priority = G_PRIORITY_HIGH - 1;
331 
332  if (sig == SIGTERM) {
333  /* TERM is higher priority than other signals,
334  * signals are higher priority than other ipc.
335  * Yes, minus: smaller is "higher"
336  */
337  priority--;
338  }
339 
340  if (sig >= NSIG || sig < 0) {
341  crm_err("Signal %d is out of range", sig);
342  return FALSE;
343 
344  } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
345  crm_trace("Signal handler for %d is already installed", sig);
346  return TRUE;
347 
348  } else if (crm_signals[sig] != NULL) {
349  crm_err("Different signal handler for %d is already installed", sig);
350  return FALSE;
351  }
352 
353  CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
354  source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
355 
356  crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
357  CRM_ASSERT(crm_signals[sig] != NULL);
358 
359  crm_signals[sig]->handler = dispatch;
360  crm_signals[sig]->signal = sig;
361 
362  if (crm_signal(sig, mainloop_signal_handler) == FALSE) {
363  crm_signal_t *tmp = crm_signals[sig];
364 
365  crm_signals[sig] = NULL;
366 
368  return FALSE;
369  }
370 #if 0
371  /* If we want signals to interrupt mainloop's poll(), instead of waiting for
372  * the timeout, then we should call siginterrupt() below
373  *
374  * For now, just enforce a low timeout
375  */
376  if (siginterrupt(sig, 1) < 0) {
377  crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
378  }
379 #endif
380 
381  return TRUE;
382 }
383 
384 gboolean
386 {
387  crm_signal_t *tmp = NULL;
388 
389  if (sig >= NSIG || sig < 0) {
390  crm_err("Signal %d is out of range", sig);
391  return FALSE;
392 
393  } else if (crm_signal(sig, NULL) == FALSE) {
394  crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
395  return FALSE;
396 
397  } else if (crm_signals[sig] == NULL) {
398  return TRUE;
399  }
400 
401  crm_trace("Destroying signal %d", sig);
402  tmp = crm_signals[sig];
403  crm_signals[sig] = NULL;
405  return TRUE;
406 }
407 
408 static qb_array_t *gio_map = NULL;
409 
410 void
412 {
413  if(gio_map) {
414  qb_array_free(gio_map);
415  }
416 }
417 
418 /*
419  * libqb...
420  */
421 struct gio_to_qb_poll {
422  int32_t is_used;
423  guint source;
424  int32_t events;
425  void *data;
426  qb_ipcs_dispatch_fn_t fn;
427  enum qb_loop_priority p;
428 };
429 
430 static gboolean
431 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
432 {
433  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
434  gint fd = g_io_channel_unix_get_fd(gio);
435 
436  crm_trace("%p.%d %d", data, fd, condition);
437 
438  /* if this assert get's hit, then there is a race condition between
439  * when we destroy a fd and when mainloop actually gives it up */
440  CRM_ASSERT(adaptor->is_used > 0);
441 
442  return (adaptor->fn(fd, condition, adaptor->data) == 0);
443 }
444 
445 static void
446 gio_poll_destroy(gpointer data)
447 {
448  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
449 
450  adaptor->is_used--;
451  CRM_ASSERT(adaptor->is_used >= 0);
452 
453  if (adaptor->is_used == 0) {
454  crm_trace("Marking adaptor %p unused", adaptor);
455  adaptor->source = 0;
456  }
457 }
458 
459 static int32_t
460 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
461  void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
462 {
463  struct gio_to_qb_poll *adaptor;
464  GIOChannel *channel;
465  int32_t res = 0;
466 
467  res = qb_array_index(gio_map, fd, (void **)&adaptor);
468  if (res < 0) {
469  crm_err("Array lookup failed for fd=%d: %d", fd, res);
470  return res;
471  }
472 
473  crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
474 
475  if (add && adaptor->source) {
476  crm_err("Adaptor for descriptor %d is still in-use", fd);
477  return -EEXIST;
478  }
479  if (!add && !adaptor->is_used) {
480  crm_err("Adaptor for descriptor %d is not in-use", fd);
481  return -ENOENT;
482  }
483 
484  /* channel is created with ref_count = 1 */
485  channel = g_io_channel_unix_new(fd);
486  if (!channel) {
487  crm_err("No memory left to add fd=%d", fd);
488  return -ENOMEM;
489  }
490 
491  if (adaptor->source) {
492  g_source_remove(adaptor->source);
493  adaptor->source = 0;
494  }
495 
496  /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
497  evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
498 
499  adaptor->fn = fn;
500  adaptor->events = evts;
501  adaptor->data = data;
502  adaptor->p = p;
503  adaptor->is_used++;
504  adaptor->source =
505  g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor,
506  gio_poll_destroy);
507 
508  /* Now that mainloop now holds a reference to channel,
509  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
510  *
511  * This means that channel will be free'd by:
512  * g_main_context_dispatch()
513  * -> g_source_destroy_internal()
514  * -> g_source_callback_unref()
515  * shortly after gio_poll_destroy() completes
516  */
517  g_io_channel_unref(channel);
518 
519  crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
520  if (adaptor->source > 0) {
521  return 0;
522  }
523 
524  return -EINVAL;
525 }
526 
527 static int32_t
528 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
529  void *data, qb_ipcs_dispatch_fn_t fn)
530 {
531  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
532 }
533 
534 static int32_t
535 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
536  void *data, qb_ipcs_dispatch_fn_t fn)
537 {
538  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
539 }
540 
541 static int32_t
542 gio_poll_dispatch_del(int32_t fd)
543 {
544  struct gio_to_qb_poll *adaptor;
545 
546  crm_trace("Looking for fd=%d", fd);
547  if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
548  if (adaptor->source) {
549  g_source_remove(adaptor->source);
550  adaptor->source = 0;
551  }
552  }
553  return 0;
554 }
555 
556 struct qb_ipcs_poll_handlers gio_poll_funcs = {
557  .job_add = NULL,
558  .dispatch_add = gio_poll_dispatch_add,
559  .dispatch_mod = gio_poll_dispatch_mod,
560  .dispatch_del = gio_poll_dispatch_del,
561 };
562 
563 static enum qb_ipc_type
564 pick_ipc_type(enum qb_ipc_type requested)
565 {
566  const char *env = getenv("PCMK_ipc_type");
567 
568  if (env && strcmp("shared-mem", env) == 0) {
569  return QB_IPC_SHM;
570  } else if (env && strcmp("socket", env) == 0) {
571  return QB_IPC_SOCKET;
572  } else if (env && strcmp("posix", env) == 0) {
573  return QB_IPC_POSIX_MQ;
574  } else if (env && strcmp("sysv", env) == 0) {
575  return QB_IPC_SYSV_MQ;
576  } else if (requested == QB_IPC_NATIVE) {
577  /* We prefer shared memory because the server never blocks on
578  * send. If part of a message fits into the socket, libqb
579  * needs to block until the remainder can be sent also.
580  * Otherwise the client will wait forever for the remaining
581  * bytes.
582  */
583  return QB_IPC_SHM;
584  }
585  return requested;
586 }
587 
588 qb_ipcs_service_t *
589 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
590  struct qb_ipcs_service_handlers * callbacks)
591 {
592  int rc = 0;
593  qb_ipcs_service_t *server = NULL;
594 
595  if (gio_map == NULL) {
596  gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
597  }
598 
599  crm_client_init();
600  server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
601 
602 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
603  /* All clients should use at least ipc_buffer_max as their buffer size */
604  qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
605 #endif
606 
607  qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
608 
609  rc = qb_ipcs_run(server);
610  if (rc < 0) {
611  crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
612  return NULL;
613  }
614 
615  return server;
616 }
617 
618 void
619 mainloop_del_ipc_server(qb_ipcs_service_t * server)
620 {
621  if (server) {
622  qb_ipcs_destroy(server);
623  }
624 }
625 
626 struct mainloop_io_s {
627  char *name;
628  void *userdata;
629 
630  int fd;
631  guint source;
632  crm_ipc_t *ipc;
633  GIOChannel *channel;
634 
635  int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
636  int (*dispatch_fn_io) (gpointer userdata);
637  void (*destroy_fn) (gpointer userdata);
638 
639 };
640 
641 static gboolean
642 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
643 {
644  gboolean keep = TRUE;
645  mainloop_io_t *client = data;
646 
647  CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
648 
649  if (condition & G_IO_IN) {
650  if (client->ipc) {
651  long rc = 0;
652  int max = 10;
653 
654  do {
655  rc = crm_ipc_read(client->ipc);
656  if (rc <= 0) {
657  crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
658  client->name, client, pcmk_strerror(rc), rc);
659 
660  } else if (client->dispatch_fn_ipc) {
661  const char *buffer = crm_ipc_buffer(client->ipc);
662 
663  crm_trace("New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
664  if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
665  crm_trace("Connection to %s no longer required", client->name);
666  keep = FALSE;
667  }
668  }
669 
670  } while (keep && rc > 0 && --max > 0);
671 
672  } else {
673  crm_trace("New message from %s[%p] %u", client->name, client, condition);
674  if (client->dispatch_fn_io) {
675  if (client->dispatch_fn_io(client->userdata) < 0) {
676  crm_trace("Connection to %s no longer required", client->name);
677  keep = FALSE;
678  }
679  }
680  }
681  }
682 
683  if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
684  crm_err("Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition);
685  keep = FALSE;
686 
687  } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
688  crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
689  client->name, client, condition);
690  keep = FALSE;
691 
692  } else if ((condition & G_IO_IN) == 0) {
693  /*
694  #define GLIB_SYSDEF_POLLIN =1
695  #define GLIB_SYSDEF_POLLPRI =2
696  #define GLIB_SYSDEF_POLLOUT =4
697  #define GLIB_SYSDEF_POLLERR =8
698  #define GLIB_SYSDEF_POLLHUP =16
699  #define GLIB_SYSDEF_POLLNVAL =32
700 
701  typedef enum
702  {
703  G_IO_IN GLIB_SYSDEF_POLLIN,
704  G_IO_OUT GLIB_SYSDEF_POLLOUT,
705  G_IO_PRI GLIB_SYSDEF_POLLPRI,
706  G_IO_ERR GLIB_SYSDEF_POLLERR,
707  G_IO_HUP GLIB_SYSDEF_POLLHUP,
708  G_IO_NVAL GLIB_SYSDEF_POLLNVAL
709  } GIOCondition;
710 
711  A bitwise combination representing a condition to watch for on an event source.
712 
713  G_IO_IN There is data to read.
714  G_IO_OUT Data can be written (without blocking).
715  G_IO_PRI There is urgent data to read.
716  G_IO_ERR Error condition.
717  G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
718  G_IO_NVAL Invalid request. The file descriptor is not open.
719  */
720  crm_err("Strange condition: %d", condition);
721  }
722 
723  /* keep == FALSE results in mainloop_gio_destroy() being called
724  * just before the source is removed from mainloop
725  */
726  return keep;
727 }
728 
729 static void
730 mainloop_gio_destroy(gpointer c)
731 {
732  mainloop_io_t *client = c;
733  char *c_name = strdup(client->name);
734 
735  /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
736  * client->channel will still have ref_count > 0... should be == 1
737  */
738  crm_trace("Destroying client %s[%p]", c_name, c);
739 
740  if (client->ipc) {
741  crm_ipc_close(client->ipc);
742  }
743 
744  if (client->destroy_fn) {
745  void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
746 
747  client->destroy_fn = NULL;
748  destroy_fn(client->userdata);
749  }
750 
751  if (client->ipc) {
752  crm_ipc_t *ipc = client->ipc;
753 
754  client->ipc = NULL;
755  crm_ipc_destroy(ipc);
756  }
757 
758  crm_trace("Destroyed client %s[%p]", c_name, c);
759 
760  free(client->name); client->name = NULL;
761  free(client);
762 
763  free(c_name);
764 }
765 
767 mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata,
768  struct ipc_client_callbacks *callbacks)
769 {
770  mainloop_io_t *client = NULL;
771  crm_ipc_t *conn = crm_ipc_new(name, max_size);
772 
773  if (conn && crm_ipc_connect(conn)) {
774  int32_t fd = crm_ipc_get_fd(conn);
775 
776  client = mainloop_add_fd(name, priority, fd, userdata, NULL);
777  }
778 
779  if (client == NULL) {
780  crm_perror(LOG_TRACE, "Connection to %s failed", name);
781  if (conn) {
782  crm_ipc_close(conn);
783  crm_ipc_destroy(conn);
784  }
785  return NULL;
786  }
787 
788  client->ipc = conn;
789  client->destroy_fn = callbacks->destroy;
790  client->dispatch_fn_ipc = callbacks->dispatch;
791  return client;
792 }
793 
794 void
796 {
797  mainloop_del_fd(client);
798 }
799 
800 crm_ipc_t *
802 {
803  if (client) {
804  return client->ipc;
805  }
806  return NULL;
807 }
808 
810 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
811  struct mainloop_fd_callbacks * callbacks)
812 {
813  mainloop_io_t *client = NULL;
814 
815  if (fd >= 0) {
816  client = calloc(1, sizeof(mainloop_io_t));
817  if (client == NULL) {
818  return NULL;
819  }
820  client->name = strdup(name);
821  client->userdata = userdata;
822 
823  if (callbacks) {
824  client->destroy_fn = callbacks->destroy;
825  client->dispatch_fn_io = callbacks->dispatch;
826  }
827 
828  client->fd = fd;
829  client->channel = g_io_channel_unix_new(fd);
830  client->source =
831  g_io_add_watch_full(client->channel, priority,
832  (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
833  client, mainloop_gio_destroy);
834 
835  /* Now that mainloop now holds a reference to channel,
836  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
837  *
838  * This means that channel will be free'd by:
839  * g_main_context_dispatch() or g_source_remove()
840  * -> g_source_destroy_internal()
841  * -> g_source_callback_unref()
842  * shortly after mainloop_gio_destroy() completes
843  */
844  g_io_channel_unref(client->channel);
845  crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
846  } else {
847  errno = EINVAL;
848  }
849 
850  return client;
851 }
852 
853 void
855 {
856  if (client != NULL) {
857  crm_trace("Removing client %s[%p]", client->name, client);
858  if (client->source) {
859  /* Results in mainloop_gio_destroy() being called just
860  * before the source is removed from mainloop
861  */
862  g_source_remove(client->source);
863  }
864  }
865 }
866 
867 static GListPtr child_list = NULL;
868 
869 pid_t
871 {
872  return child->pid;
873 }
874 
875 const char *
877 {
878  return child->desc;
879 }
880 
881 int
883 {
884  return child->timeout;
885 }
886 
887 void *
889 {
890  return child->privatedata;
891 }
892 
893 void
895 {
896  child->privatedata = NULL;
897 }
898 
899 /* good function name */
900 static void
901 child_free(mainloop_child_t *child)
902 {
903  if (child->timerid != 0) {
904  crm_trace("Removing timer %d", child->timerid);
905  g_source_remove(child->timerid);
906  child->timerid = 0;
907  }
908  free(child->desc);
909  free(child);
910 }
911 
912 /* terrible function name */
913 static int
914 child_kill_helper(mainloop_child_t *child)
915 {
916  int rc;
917  if (child->flags & mainloop_leave_pid_group) {
918  crm_debug("Kill pid %d only. leave group intact.", child->pid);
919  rc = kill(child->pid, SIGKILL);
920  } else {
921  crm_debug("Kill pid %d's group", child->pid);
922  rc = kill(-child->pid, SIGKILL);
923  }
924 
925  if (rc < 0) {
926  if (errno != ESRCH) {
927  crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
928  }
929  return -errno;
930  }
931  return 0;
932 }
933 
934 static gboolean
935 child_timeout_callback(gpointer p)
936 {
937  mainloop_child_t *child = p;
938  int rc = 0;
939 
940  child->timerid = 0;
941  if (child->timeout) {
942  crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
943  return FALSE;
944  }
945 
946  rc = child_kill_helper(child);
947  if (rc == ESRCH) {
948  /* Nothing left to do. pid doesn't exist */
949  return FALSE;
950  }
951 
952  child->timeout = TRUE;
953  crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
954 
955  child->timerid = g_timeout_add(5000, child_timeout_callback, child);
956  return FALSE;
957 }
958 
959 static gboolean
960 child_waitpid(mainloop_child_t *child, int flags)
961 {
962  int rc = 0;
963  int core = 0;
964  int signo = 0;
965  int status = 0;
966  int exitcode = 0;
967 
968  rc = waitpid(child->pid, &status, flags);
969  if(rc == 0) {
970  crm_perror(LOG_DEBUG, "wait(%d) = %d", child->pid, rc);
971  return FALSE;
972 
973  } else if(rc != child->pid) {
974  signo = SIGCHLD;
975  exitcode = 1;
976  status = 1;
977  crm_perror(LOG_ERR, "Call to waitpid(%d) failed", child->pid);
978 
979  } else {
980  crm_trace("Managed process %d exited: %p", child->pid, child);
981 
982  if (WIFEXITED(status)) {
983  exitcode = WEXITSTATUS(status);
984  crm_trace("Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
985 
986  } else if (WIFSIGNALED(status)) {
987  signo = WTERMSIG(status);
988  crm_trace("Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
989  }
990 #ifdef WCOREDUMP
991  if (WCOREDUMP(status)) {
992  core = 1;
993  crm_err("Managed process %d (%s) dumped core", child->pid, child->desc);
994  }
995 #endif
996  }
997 
998  if (child->callback) {
999  child->callback(child, child->pid, core, signo, exitcode);
1000  }
1001  return TRUE;
1002 }
1003 
1004 static void
1005 child_death_dispatch(int signal)
1006 {
1007  GListPtr iter = child_list;
1008  gboolean exited;
1009 
1010  while(iter) {
1011  GListPtr saved = NULL;
1012  mainloop_child_t *child = iter->data;
1013  exited = child_waitpid(child, WNOHANG);
1014 
1015  saved = iter;
1016  iter = iter->next;
1017 
1018  if (exited == FALSE) {
1019  continue;
1020  }
1021  crm_trace("Removing process entry %p for %d", child, child->pid);
1022 
1023  child_list = g_list_remove_link(child_list, saved);
1024  g_list_free(saved);
1025  child_free(child);
1026  }
1027 }
1028 
1029 static gboolean
1030 child_signal_init(gpointer p)
1031 {
1032  crm_trace("Installed SIGCHLD handler");
1033  /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1034  mainloop_add_signal(SIGCHLD, child_death_dispatch);
1035 
1036  /* In case they terminated before the signal handler was installed */
1037  child_death_dispatch(SIGCHLD);
1038  return FALSE;
1039 }
1040 
1041 int
1043 {
1044  GListPtr iter;
1045  mainloop_child_t *child = NULL;
1046  mainloop_child_t *match = NULL;
1047  /* It is impossible to block SIGKILL, this allows us to
1048  * call waitpid without WNOHANG flag.*/
1049  int waitflags = 0, rc = 0;
1050 
1051  for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1052  child = iter->data;
1053  if (pid == child->pid) {
1054  match = child;
1055  }
1056  }
1057 
1058  if (match == NULL) {
1059  return FALSE;
1060  }
1061 
1062  rc = child_kill_helper(match);
1063  if(rc == -ESRCH) {
1064  /* It's gone, but hasn't shown up in waitpid() yet
1065  *
1066  * Wait until we get SIGCHLD and let child_death_dispatch()
1067  * clean it up as normal (so we get the correct return
1068  * code/status)
1069  *
1070  * The blocking alternative would be to call:
1071  * child_waitpid(match, 0);
1072  */
1073  crm_trace("Waiting for child %d to be reaped by child_death_dispatch()", match->pid);
1074  return TRUE;
1075 
1076  } else if(rc != 0) {
1077  /* If KILL for some other reason set the WNOHANG flag since we
1078  * can't be certain what happened.
1079  */
1080  waitflags = WNOHANG;
1081  }
1082 
1083  if (child_waitpid(match, waitflags) == FALSE) {
1084  /* not much we can do if this occurs */
1085  return FALSE;
1086  }
1087 
1088  child_list = g_list_remove(child_list, match);
1089  child_free(match);
1090  return TRUE;
1091 }
1092 
1093 /* Create/Log a new tracked process
1094  * To track a process group, use -pid
1095  */
1096 void
1097 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1098  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1099 {
1100  static bool need_init = TRUE;
1101  mainloop_child_t *child = g_new(mainloop_child_t, 1);
1102 
1103  child->pid = pid;
1104  child->timerid = 0;
1105  child->timeout = FALSE;
1106  child->privatedata = privatedata;
1107  child->callback = callback;
1108  child->flags = flags;
1109 
1110  if(desc) {
1111  child->desc = strdup(desc);
1112  }
1113 
1114  if (timeout) {
1115  child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1116  }
1117 
1118  child_list = g_list_append(child_list, child);
1119 
1120  if(need_init) {
1121  need_init = FALSE;
1122  /* SIGCHLD processing has to be invoked from mainloop.
1123  * We do not want it to be possible to both add a child pid
1124  * to mainloop, and have the pid's exit callback invoked within
1125  * the same callstack. */
1126  g_timeout_add(1, child_signal_init, NULL);
1127  }
1128 }
1129 
1130 void
1131 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1132  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1133 {
1134  mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1135 }
1136 
1137 struct mainloop_timer_s {
1138  guint id;
1139  guint period_ms;
1140  bool repeat;
1141  char *name;
1142  GSourceFunc cb;
1143  void *userdata;
1144 };
1145 
1146 struct mainloop_timer_s mainloop;
1147 
1148 static gboolean mainloop_timer_cb(gpointer user_data)
1149 {
1150  int id = 0;
1151  bool repeat = FALSE;
1152  struct mainloop_timer_s *t = user_data;
1153 
1154  CRM_ASSERT(t != NULL);
1155 
1156  id = t->id;
1157  t->id = 0; /* Ensure it's unset during callbacks so that
1158  * mainloop_timer_running() works as expected
1159  */
1160 
1161  if(t->cb) {
1162  crm_trace("Invoking callbacks for timer %s", t->name);
1163  repeat = t->repeat;
1164  if(t->cb(t->userdata) == FALSE) {
1165  crm_trace("Timer %s complete", t->name);
1166  repeat = FALSE;
1167  }
1168  }
1169 
1170  if(repeat) {
1171  /* Restore if repeating */
1172  t->id = id;
1173  }
1174 
1175  return repeat;
1176 }
1177 
1179 {
1180  if(t && t->id != 0) {
1181  return TRUE;
1182  }
1183  return FALSE;
1184 }
1185 
1187 {
1189  if(t && t->period_ms > 0) {
1190  crm_trace("Starting timer %s", t->name);
1191  t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1192  }
1193 }
1194 
1196 {
1197  if(t && t->id != 0) {
1198  crm_trace("Stopping timer %s", t->name);
1199  g_source_remove(t->id);
1200  t->id = 0;
1201  }
1202 }
1203 
1205 {
1206  guint last = 0;
1207 
1208  if(t) {
1209  last = t->period_ms;
1210  t->period_ms = period_ms;
1211  }
1212 
1213  if(t && t->id != 0 && last != t->period_ms) {
1215  }
1216  return last;
1217 }
1218 
1219 
1221 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1222 {
1223  mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
1224 
1225  if(t) {
1226  if(name) {
1227  t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1228  } else {
1229  t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1230  }
1231  t->id = 0;
1232  t->period_ms = period_ms;
1233  t->repeat = repeat;
1234  t->cb = cb;
1235  t->userdata = userdata;
1236  crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1237  }
1238  return t;
1239 }
1240 
1241 void
1243 {
1244  if(t) {
1245  crm_trace("Destroying timer %s", t->name);
1247  free(t->name);
1248  free(t);
1249  }
1250 }
1251 
#define LOG_TRACE
Definition: logging.h:29
bool crm_ipc_connect(crm_ipc_t *client)
Establish an IPC connection to a Pacemaker component.
Definition: ipc.c:873
A dumping ground.
#define crm_notice(fmt, args...)
Definition: logging.h:250
struct signal_s crm_signal_t
#define crm_crit(fmt, args...)
Definition: logging.h:247
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:327
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:810
void mainloop_timer_start(mainloop_timer_t *t)
Definition: mainloop.c:1186
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
Definition: mainloop.c:1204
mainloop_child_flags
Definition: mainloop.h:29
void mainloop_timer_del(mainloop_timer_t *t)
Definition: mainloop.c:1242
gboolean mainloop_child_kill(pid_t pid)
Definition: mainloop.c:1042
int crm_ipc_get_fd(crm_ipc_t *client)
Definition: ipc.c:942
const char * pcmk_strerror(int rc)
Definition: logging.c:1135
struct mainloop_timer_s mainloop_timer_t
Definition: mainloop.h:37
struct mainloop_io_s mainloop_io_t
Definition: mainloop.h:35
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *userdata, enum mainloop_child_flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1097
struct mainloop_child_s mainloop_child_t
Definition: mainloop.h:36
void mainloop_set_trigger(crm_trigger_t *source)
Definition: mainloop.c:225
void mainloop_cleanup(void)
Definition: mainloop.c:411
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:90
pid_t mainloop_child_pid(mainloop_child_t *child)
Definition: mainloop.c:870
long crm_ipc_read(crm_ipc_t *client)
Definition: ipc.c:1050
uint32_t pid
Definition: internal.h:49
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
Definition: mainloop.c:233
struct qb_ipcs_poll_handlers gio_poll_funcs
Definition: mainloop.c:556
void mainloop_timer_stop(mainloop_timer_t *t)
Definition: mainloop.c:1195
Wrappers for and extensions to glib mainloop.
void crm_client_init(void)
Definition: ipc.c:252
const char * crm_ipc_buffer(crm_ipc_t *client)
Definition: ipc.c:1096
struct trigger_s crm_trigger_t
Definition: mainloop.h:34
void(* destroy)(gpointer)
Definition: mainloop.h:74
#define crm_warn(fmt, args...)
Definition: logging.h:249
uint32_t id
Definition: internal.h:48
#define crm_debug(fmt, args...)
Definition: logging.h:253
void * mainloop_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:888
struct crm_ipc_s crm_ipc_t
Definition: ipc.h:61
void(* destroy)(gpointer userdata)
Definition: mainloop.h:91
gboolean crm_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:302
#define crm_trace(fmt, args...)
Definition: logging.h:254
Wrappers for and extensions to libxml2.
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
Definition: mainloop.c:213
void mainloop_del_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:795
unsigned int crm_ipc_default_buffer_size(void)
Definition: ipc.c:67
void crm_ipc_destroy(crm_ipc_t *client)
Definition: ipc.c:919
bool mainloop_timer_running(mainloop_timer_t *t)
Definition: mainloop.c:1178
bool crm_ipc_connected(crm_ipc_t *client)
Definition: ipc.c:956
#define CRM_XS
Definition: logging.h:42
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *userdata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1131
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:226
void mainloop_trigger_complete(crm_trigger_t *trig)
Definition: mainloop.c:201
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:801
#define crm_err(fmt, args...)
Definition: logging.h:248
struct mainloop_timer_s mainloop
Definition: mainloop.c:1146
void mainloop_clear_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:894
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
Definition: ipc.c:845
#define CRM_ASSERT(expr)
Definition: error.h:35
char data[0]
Definition: internal.h:58
void mainloop_del_fd(mainloop_io_t *client)
Definition: mainloop.c:854
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
Definition: mainloop.c:619
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
Definition: mainloop.c:1221
gboolean mainloop_destroy_signal(int sig)
Definition: mainloop.c:385
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
Definition: mainloop.c:767
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Definition: mainloop.c:589
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
void crm_ipc_close(crm_ipc_t *client)
Definition: ipc.c:904
const char * mainloop_child_name(mainloop_child_t *child)
Definition: mainloop.c:876
GList * GListPtr
Definition: crm.h:218
#define crm_info(fmt, args...)
Definition: logging.h:251
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
Definition: mainloop.h:73
uint64_t flags
Definition: remote.c:156
enum crm_ais_msg_types type
Definition: internal.h:51
#define int32_t
Definition: stdint.in.h:157
int mainloop_child_timeout(mainloop_child_t *child)
Definition: mainloop.c:882