root/maint/gnulib/lib/pipe-filter-gi.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. reader_thread_func
  2. filter_init
  3. filter_loop
  4. filter_cleanup
  5. filter_init
  6. filter_loop
  7. filter_cleanup
  8. filter_terminate
  9. filter_retcode
  10. pipe_filter_gi_create
  11. pipe_filter_gi_write
  12. pipe_filter_gi_close

   1 /* Filtering of data through a subprocess.
   2    Copyright (C) 2001-2003, 2008-2021 Free Software Foundation, Inc.
   3    Written by Paolo Bonzini <bonzini@gnu.org>, 2009,
   4    and Bruno Haible <bruno@clisp.org>, 2009.
   5 
   6    This program is free software: you can redistribute it and/or modify
   7    it under the terms of the GNU General Public License as published by
   8    the Free Software Foundation; either version 3 of the License, or
   9    (at your option) any later version.
  10 
  11    This program is distributed in the hope that it will be useful,
  12    but WITHOUT ANY WARRANTY; without even the implied warranty of
  13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14    GNU General Public License for more details.
  15 
  16    You should have received a copy of the GNU General Public License
  17    along with this program.  If not, see <https://www.gnu.org/licenses/>.  */
  18 
  19 #include <config.h>
  20 
  21 #include "pipe-filter.h"
  22 
  23 #include <errno.h>
  24 #include <fcntl.h>
  25 #include <stdbool.h>
  26 #include <stdint.h>
  27 #include <stdlib.h>
  28 #include <unistd.h>
  29 #if defined _WIN32 && ! defined __CYGWIN__
  30 # include <windows.h>
  31 # include <process.h> /* _beginthreadex, _endthreadex */
  32 #else
  33 # include <signal.h>
  34 # include <sys/select.h>
  35 #endif
  36 
  37 #include "error.h"
  38 #include "spawn-pipe.h"
  39 #include "wait-process.h"
  40 #include "xalloc.h"
  41 #include "gettext.h"
  42 
  43 #define _(str) gettext (str)
  44 
  45 #include "pipe-filter-aux.h"
  46 
  47 struct pipe_filter_gi
  48 {
  49   /* Arguments passed to pipe_filter_gi_create.  */
  50   const char *progname;
  51   bool null_stderr;
  52   bool exit_on_error;
  53   prepare_read_fn prepare_read;
  54   done_read_fn done_read;
  55   void *private_data;
  56 
  57   /* Management of the subprocess.  */
  58   pid_t child;
  59   int fd[2];
  60   bool exited;
  61   int exitstatus;
  62 
  63   /* Status of the writer part.  */
  64   volatile bool writer_terminated;
  65   int writer_errno;
  66   /* Status of the reader part.  */
  67   volatile bool reader_terminated;
  68   volatile int reader_errno;
  69 
  70 #if defined _WIN32 && ! defined __CYGWIN__
  71   CRITICAL_SECTION lock; /* protects the volatile fields */
  72   HANDLE reader_thread_handle;
  73 #else
  74   struct sigaction orig_sigpipe_action;
  75   fd_set readfds;  /* All bits except fd[0] are always cleared.  */
  76   fd_set writefds; /* All bits except fd[1] are always cleared.  */
  77 #endif
  78 };
  79 
  80 
  81 /* Platform dependent functions.  */
  82 
  83 /* Perform additional initializations.
  84    Return 0 if successful, -1 upon failure.  */
  85 static int filter_init (struct pipe_filter_gi *filter);
  86 
  87 /* Write count bytes starting at buf, while at the same time invoking the
  88    read iterator (the functions prepare_read/done_read) when needed.  */
  89 static void filter_loop (struct pipe_filter_gi *filter,
  90                          const char *wbuf, size_t count);
  91 
  92 /* Perform cleanup actions at the end.
  93    finish_reading is true if there was no error, or false if some error
  94    occurred already.  */
  95 static void filter_cleanup (struct pipe_filter_gi *filter,
  96                             bool finish_reading);
  97 
  98 
  99 #if defined _WIN32 && ! defined __CYGWIN__
 100 /* Native Windows API.  */
 101 
 102 static unsigned int WINAPI
 103 reader_thread_func (void *thread_arg)
     /* [previous][next][first][last][top][bottom][index][help] */
 104 {
 105   struct pipe_filter_gi *filter = (struct pipe_filter_gi *) thread_arg;
 106 
 107   for (;;)
 108     {
 109       size_t bufsize;
 110       void *buf = filter->prepare_read (&bufsize, filter->private_data);
 111       if (!(buf != NULL && bufsize > 0))
 112         /* prepare_read returned wrong values.  */
 113         abort ();
 114       {
 115         ssize_t nread =
 116           read (filter->fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
 117         EnterCriticalSection (&filter->lock);
 118         /* If the writer already encountered an error, terminate.  */
 119         if (filter->writer_terminated)
 120           break;
 121         if (nread < 0)
 122           {
 123             filter->reader_errno = errno;
 124             break;
 125           }
 126         else if (nread > 0)
 127           filter->done_read (buf, nread, filter->private_data);
 128         else /* nread == 0 */
 129           break;
 130         LeaveCriticalSection (&filter->lock);
 131       }
 132     }
 133 
 134   filter->reader_terminated = true;
 135   LeaveCriticalSection (&filter->lock);
 136   _endthreadex (0); /* calls ExitThread (0) */
 137   abort ();
 138 }
 139 
 140 static int
 141 filter_init (struct pipe_filter_gi *filter)
     /* [previous][next][first][last][top][bottom][index][help] */
 142 {
 143   InitializeCriticalSection (&filter->lock);
 144   EnterCriticalSection (&filter->lock);
 145 
 146   filter->reader_thread_handle =
 147     (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, filter,
 148                              0, NULL);
 149 
 150   if (filter->reader_thread_handle == NULL)
 151     {
 152       if (filter->exit_on_error)
 153         error (EXIT_FAILURE, 0, _("creation of reading thread failed"));
 154       return -1;
 155     }
 156   else
 157     return 0;
 158 }
 159 
 160 static void
 161 filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count)
     /* [previous][next][first][last][top][bottom][index][help] */
 162 {
 163   if (!filter->writer_terminated)
 164     {
 165       for (;;)
 166         {
 167           ssize_t nwritten;
 168 
 169           /* Allow the reader thread to continue.  */
 170           LeaveCriticalSection (&filter->lock);
 171 
 172           nwritten =
 173             write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count);
 174 
 175           /* Get the lock back from the reader thread.  */
 176           EnterCriticalSection (&filter->lock);
 177 
 178           if (nwritten < 0)
 179             {
 180               /* Don't assume that the gnulib modules 'write' and 'sigpipe' are
 181                  used.  */
 182               if (GetLastError () == ERROR_NO_DATA)
 183                 errno = EPIPE;
 184               filter->writer_errno = errno;
 185               filter->writer_terminated = true;
 186               break;
 187             }
 188           else if (nwritten > 0)
 189             {
 190               count -= nwritten;
 191               if (count == 0)
 192                 break;
 193               wbuf += nwritten;
 194             }
 195           else /* nwritten == 0 */
 196             {
 197               filter->writer_terminated = true;
 198               break;
 199             }
 200         }
 201     }
 202 }
 203 
 204 static void
 205 filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading)
     /* [previous][next][first][last][top][bottom][index][help] */
 206 {
 207   if (finish_reading)
 208     {
 209       LeaveCriticalSection (&filter->lock);
 210       WaitForSingleObject (filter->reader_thread_handle, INFINITE);
 211     }
 212   else
 213     TerminateThread (filter->reader_thread_handle, 1);
 214 
 215   CloseHandle (filter->reader_thread_handle);
 216   DeleteCriticalSection (&filter->lock);
 217 }
 218 
 219 #else
 220 /* Unix API.  */
 221 
 222 static int
 223 filter_init (struct pipe_filter_gi *filter)
     /* [previous][next][first][last][top][bottom][index][help] */
 224 {
 225 #if !(defined _WIN32 && ! defined __CYGWIN__)
 226   /* When we write to the child process and it has just terminated,
 227      we don't want to die from a SIGPIPE signal.  So set the SIGPIPE
 228      handler to SIG_IGN, and handle EPIPE error codes in write().  */
 229   {
 230     struct sigaction sigpipe_action;
 231 
 232     sigpipe_action.sa_handler = SIG_IGN;
 233     sigpipe_action.sa_flags = 0;
 234     sigemptyset (&sigpipe_action.sa_mask);
 235     if (sigaction (SIGPIPE, &sigpipe_action, &filter->orig_sigpipe_action) < 0)
 236       abort ();
 237   }
 238 #endif
 239 
 240   /* Enable non-blocking I/O.  This permits the read() and write() calls
 241      to return -1/EAGAIN without blocking; this is important for polling
 242      if HAVE_SELECT is not defined.  It also permits the read() and write()
 243      calls to return after partial reads/writes; this is important if
 244      HAVE_SELECT is defined, because select() only says that some data
 245      can be read or written, not how many.  Without non-blocking I/O,
 246      Linux 2.2.17 and BSD systems prefer to block instead of returning
 247      with partial results.  */
 248   {
 249     int fcntl_flags;
 250 
 251     if ((fcntl_flags = fcntl (filter->fd[1], F_GETFL, 0)) < 0
 252         || fcntl (filter->fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) == -1
 253         || (fcntl_flags = fcntl (filter->fd[0], F_GETFL, 0)) < 0
 254         || fcntl (filter->fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) == -1)
 255       {
 256         if (filter->exit_on_error)
 257           error (EXIT_FAILURE, errno,
 258                  _("cannot set up nonblocking I/O to %s subprocess"),
 259                  filter->progname);
 260         return -1;
 261       }
 262   }
 263 
 264   FD_ZERO (&filter->readfds);
 265   FD_ZERO (&filter->writefds);
 266 
 267   return 0;
 268 }
 269 
 270 static void
 271 filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count)
     /* [previous][next][first][last][top][bottom][index][help] */
 272 {
 273   /* This function is used in two situations:
 274      - in order to write some data to the subprocess
 275        [done_writing = false],
 276      - in order to read the remaining data after everything was written
 277        [done_writing = true].  In this case buf is NULL and count is
 278        ignored.  */
 279   bool done_writing = (wbuf == NULL);
 280 
 281   if (!done_writing)
 282     {
 283       if (filter->writer_terminated || filter->reader_terminated)
 284         /* pipe_filter_gi_write was called when it should not be.  */
 285         abort ();
 286     }
 287   else
 288     {
 289       if (filter->reader_terminated)
 290         return;
 291     }
 292 
 293   /* Loop, trying to write the given buffer or reading, whichever is
 294      possible.  */
 295   for (;;)
 296     {
 297       /* Here filter->writer_terminated is false.  When it becomes true, this
 298          loop is terminated.  */
 299       /* Whereas filter->reader_terminated is initially false but may become
 300          true during this loop.  */
 301       /* Here, if !done_writing, count > 0.  When count becomes 0, this loop
 302          is terminated.  */
 303       /* Here, if done_writing, filter->reader_terminated is false.  When
 304          filter->reader_terminated becomes true, this loop is terminated.  */
 305 # if HAVE_SELECT
 306       int n, retval;
 307 
 308       /* See whether reading or writing is possible.  */
 309       n = 1;
 310       if (!filter->reader_terminated)
 311         {
 312           FD_SET (filter->fd[0], &filter->readfds);
 313           n = filter->fd[0] + 1;
 314         }
 315       if (!done_writing)
 316         {
 317           FD_SET (filter->fd[1], &filter->writefds);
 318           if (n <= filter->fd[1])
 319             n = filter->fd[1] + 1;
 320         }
 321       /* Do EINTR handling here instead of in pipe-filter-aux.h,
 322          because select() cannot be referred to from an inline
 323          function on AIX 7.1.  */
 324       do
 325         retval = select (n,
 326                          (!filter->reader_terminated ? &filter->readfds : NULL),
 327                          (!done_writing ? &filter->writefds : NULL),
 328                          NULL, NULL);
 329       while (retval < 0 && errno == EINTR);
 330       n = retval;
 331 
 332       if (n < 0)
 333         {
 334           if (filter->exit_on_error)
 335             error (EXIT_FAILURE, errno,
 336                    _("communication with %s subprocess failed"),
 337                    filter->progname);
 338           filter->writer_errno = errno;
 339           filter->writer_terminated = true;
 340           break;
 341         }
 342 
 343       if (!done_writing && FD_ISSET (filter->fd[1], &filter->writefds))
 344         goto try_write;
 345       if (!filter->reader_terminated
 346           && FD_ISSET (filter->fd[0], &filter->readfds))
 347         goto try_read;
 348       /* How could select() return if none of the two descriptors is ready?  */
 349       abort ();
 350 # endif
 351 
 352       /* Attempt to write.  */
 353 # if HAVE_SELECT
 354     try_write:
 355 # endif
 356       if (!done_writing)
 357         {
 358           ssize_t nwritten =
 359             write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count);
 360           if (nwritten < 0)
 361             {
 362               if (!IS_EAGAIN (errno))
 363                 {
 364                   if (filter->exit_on_error)
 365                     error (EXIT_FAILURE, errno,
 366                            _("write to %s subprocess failed"),
 367                            filter->progname);
 368                   filter->writer_errno = errno;
 369                   filter->writer_terminated = true;
 370                   break;
 371                 }
 372             }
 373           else if (nwritten > 0)
 374             {
 375               count -= nwritten;
 376               if (count == 0)
 377                 break;
 378               wbuf += nwritten;
 379             }
 380         }
 381 # if HAVE_SELECT
 382       continue;
 383 # endif
 384 
 385       /* Attempt to read.  */
 386 # if HAVE_SELECT
 387     try_read:
 388 # endif
 389       if (!filter->reader_terminated)
 390         {
 391           size_t bufsize;
 392           void *buf = filter->prepare_read (&bufsize, filter->private_data);
 393           if (!(buf != NULL && bufsize > 0))
 394             /* prepare_read returned wrong values.  */
 395             abort ();
 396           {
 397             ssize_t nread =
 398               read (filter->fd[0], buf,
 399                     bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
 400             if (nread < 0)
 401               {
 402                 if (!IS_EAGAIN (errno))
 403                   {
 404                     if (filter->exit_on_error)
 405                       error (EXIT_FAILURE, errno,
 406                              _("read from %s subprocess failed"),
 407                              filter->progname);
 408                     filter->reader_errno = errno;
 409                     filter->reader_terminated = true;
 410                     break;
 411                   }
 412               }
 413             else if (nread > 0)
 414               filter->done_read (buf, nread, filter->private_data);
 415             else /* nread == 0 */
 416               {
 417                 filter->reader_terminated = true;
 418                 if (done_writing)
 419                   break;
 420               }
 421           }
 422       }
 423 # if HAVE_SELECT
 424       continue;
 425 # endif
 426     }
 427 }
 428 
 429 static void
 430 filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading)
     /* [previous][next][first][last][top][bottom][index][help] */
 431 {
 432   if (finish_reading)
 433     /* A select loop, with done_writing = true.  */
 434     filter_loop (filter, NULL, 0);
 435 
 436   if (sigaction (SIGPIPE, &filter->orig_sigpipe_action, NULL) < 0)
 437     abort ();
 438 }
 439 
 440 #endif
 441 
 442 
 443 /* Terminate the child process.  Do nothing if it already exited.  */
 444 static void
 445 filter_terminate (struct pipe_filter_gi *filter)
     /* [previous][next][first][last][top][bottom][index][help] */
 446 {
 447   if (!filter->exited)
 448     {
 449       /* Tell the child there is nothing more the parent will send.  */
 450       close (filter->fd[1]);
 451       filter_cleanup (filter, !filter->reader_terminated);
 452       close (filter->fd[0]);
 453       filter->exitstatus =
 454         wait_subprocess (filter->child, filter->progname, true,
 455                          filter->null_stderr, true, filter->exit_on_error,
 456                          NULL);
 457       if (filter->exitstatus != 0 && filter->exit_on_error)
 458         error (EXIT_FAILURE, 0,
 459                _("subprocess %s terminated with exit code %d"),
 460                filter->progname, filter->exitstatus);
 461       filter->exited = true;
 462     }
 463 }
 464 
 465 /* After filter_terminate:
 466    Return 0 upon success, or (only if exit_on_error is false):
 467    - -1 with errno set upon failure,
 468    - the positive exit code of the subprocess if that failed.  */
 469 static int
 470 filter_retcode (struct pipe_filter_gi *filter)
     /* [previous][next][first][last][top][bottom][index][help] */
 471 {
 472   if (filter->writer_errno != 0)
 473     {
 474       errno = filter->writer_errno;
 475       return -1;
 476     }
 477   else if (filter->reader_errno != 0)
 478     {
 479       errno = filter->reader_errno;
 480       return -1;
 481     }
 482   else
 483     return filter->exitstatus;
 484 }
 485 
 486 struct pipe_filter_gi *
 487 pipe_filter_gi_create (const char *progname,
     /* [previous][next][first][last][top][bottom][index][help] */
 488                        const char *prog_path, const char * const *prog_argv,
 489                        bool null_stderr, bool exit_on_error,
 490                        prepare_read_fn prepare_read,
 491                        done_read_fn done_read,
 492                        void *private_data)
 493 {
 494   struct pipe_filter_gi *filter;
 495 
 496   filter =
 497     (struct pipe_filter_gi *) xmalloc (sizeof (struct pipe_filter_gi));
 498 
 499   /* Open a bidirectional pipe to a subprocess.  */
 500   filter->child = create_pipe_bidi (progname, prog_path, prog_argv,
 501                                     NULL, null_stderr, true, exit_on_error,
 502                                     filter->fd);
 503   filter->progname = progname;
 504   filter->null_stderr = null_stderr;
 505   filter->exit_on_error = exit_on_error;
 506   filter->prepare_read = prepare_read;
 507   filter->done_read = done_read;
 508   filter->private_data = private_data;
 509   filter->exited = false;
 510   filter->exitstatus = 0;
 511   filter->writer_terminated = false;
 512   filter->writer_errno = 0;
 513   filter->reader_terminated = false;
 514   filter->reader_errno = 0;
 515 
 516   if (filter->child == -1)
 517     {
 518       /* Child process could not be created.
 519          Arrange for filter_retcode (filter) to be the current errno.  */
 520       filter->writer_errno = errno;
 521       filter->writer_terminated = true;
 522       filter->exited = true;
 523     }
 524   else if (filter_init (filter) < 0)
 525     filter_terminate (filter);
 526 
 527   return filter;
 528 }
 529 
 530 int
 531 pipe_filter_gi_write (struct pipe_filter_gi *filter,
     /* [previous][next][first][last][top][bottom][index][help] */
 532                       const void *buf, size_t size)
 533 {
 534   if (buf == NULL)
 535     /* Invalid argument.  */
 536     abort ();
 537 
 538   if (filter->exited)
 539     return filter_retcode (filter);
 540 
 541   if (size > 0)
 542     {
 543       filter_loop (filter, buf, size);
 544       if (filter->writer_terminated || filter->reader_terminated)
 545         {
 546           filter_terminate (filter);
 547           return filter_retcode (filter);
 548         }
 549     }
 550   return 0;
 551 }
 552 
 553 int
 554 pipe_filter_gi_close (struct pipe_filter_gi *filter)
     /* [previous][next][first][last][top][bottom][index][help] */
 555 {
 556   int ret;
 557 
 558   filter_terminate (filter);
 559   ret = filter_retcode (filter);
 560   free (filter);
 561   return ret;
 562 }

/* [previous][next][first][last][top][bottom][index][help] */