root/maint/gnulib/lib/pipe-filter-ii.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. start_wrapper
  2. _beginthreadex
  3. CloseHandle
  4. WaitForSingleObject
  5. WaitForMultipleObjects
  6. writer_thread_func
  7. reader_thread_func
  8. pipe_filter_ii_execute

   1 /* Filtering of data through a subprocess.
   2    Copyright (C) 2001-2003, 2008-2021 Free Software Foundation, Inc.
   3    Written by Bruno Haible <bruno@clisp.org>, 2009.
   4 
   5    This program is free software: you can redistribute it and/or modify
   6    it under the terms of the GNU General Public License as published by
   7    the Free Software Foundation; either version 3 of the License, or
   8    (at your option) any later version.
   9 
  10    This program is distributed in the hope that it will be useful,
  11    but WITHOUT ANY WARRANTY; without even the implied warranty of
  12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13    GNU General Public License for more details.
  14 
  15    You should have received a copy of the GNU General Public License
  16    along with this program.  If not, see <https://www.gnu.org/licenses/>.  */
  17 
  18 #include <config.h>
  19 
  20 #include "pipe-filter.h"
  21 
  22 #include <errno.h>
  23 #include <fcntl.h>
  24 #include <stdbool.h>
  25 #include <stdint.h>
  26 #include <stdlib.h>
  27 #include <unistd.h>
  28 #if defined _WIN32 && ! defined __CYGWIN__
  29 # include <windows.h>
  30 # include <process.h> /* _beginthreadex, _endthreadex */
  31 #elif defined __KLIBC__
  32 # define INCL_DOS
  33 # include <os2.h>
  34 
  35 /* Simple implementation of Win32 APIs */
  36 
  37 # define WINAPI
  38 
  39 typedef struct _HANDLE
  40 {
  41   TID tid;
  42   HEV hevDone;
  43   unsigned int WINAPI (*start) (void *);
  44   void *arg;
  45 } *HANDLE;
  46 
  47 typedef ULONG DWORD;
  48 
  49 static void
  50 start_wrapper (void *arg)
     /* [previous][next][first][last][top][bottom][index][help] */
  51 {
  52   HANDLE h = (HANDLE) arg;
  53 
  54   h->start (h->arg);
  55 
  56   DosPostEventSem (h->hevDone);
  57   _endthread ();
  58 }
  59 
  60 static HANDLE
  61 _beginthreadex (void *s, unsigned n, unsigned int WINAPI (*start) (void *),
     /* [previous][next][first][last][top][bottom][index][help] */
  62                 void *arg, unsigned fl, unsigned *th)
  63 {
  64   HANDLE h;
  65 
  66   h = malloc (sizeof (*h));
  67   if (!h)
  68     return NULL;
  69 
  70   if (DosCreateEventSem (NULL, &h->hevDone, 0, FALSE))
  71     goto exit_free;
  72 
  73   h->start = start;
  74   h->arg = arg;
  75 
  76   h->tid = _beginthread (start_wrapper, NULL, n, (void *) h);
  77   if (h->tid == -1)
  78     goto exit_close_event_sem;
  79 
  80   return h;
  81 
  82  exit_close_event_sem:
  83   DosCloseEventSem (h->hevDone);
  84 
  85  exit_free:
  86   free (h);
  87 
  88   return NULL;
  89 }
  90 
  91 static BOOL
  92 CloseHandle (HANDLE h)
     /* [previous][next][first][last][top][bottom][index][help] */
  93 {
  94   DosCloseEventSem (h->hevDone);
  95   free (h);
  96 }
  97 
  98 # define _endthreadex(x) return (x)
  99 # define TerminateThread(h, e) DosKillThread (h->tid)
 100 
 101 # define GetLastError()  -1
 102 
 103 # ifndef ERROR_NO_DATA
 104 #  define ERROR_NO_DATA 232
 105 # endif
 106 
 107 # define INFINITE SEM_INDEFINITE_WAIT
 108 # define WAIT_OBJECT_0  0
 109 
 110 static DWORD
 111 WaitForSingleObject (HANDLE h, DWORD ms)
     /* [previous][next][first][last][top][bottom][index][help] */
 112 {
 113   return DosWaitEventSem (h->hevDone, ms) == 0 ? WAIT_OBJECT_0 : (DWORD) -1;
 114 }
 115 
 116 static DWORD
 117 WaitForMultipleObjects (DWORD nCount, const HANDLE *pHandles, BOOL bWaitAll,
     /* [previous][next][first][last][top][bottom][index][help] */
 118                         DWORD ms)
 119 {
 120   HMUX hmux;
 121   PSEMRECORD psr;
 122   ULONG ulUser;
 123   ULONG rc = (ULONG) -1;
 124   DWORD i;
 125 
 126   psr = malloc (sizeof (*psr) * nCount);
 127   if (!psr)
 128     goto exit_return;
 129 
 130   for (i = 0; i < nCount; ++i)
 131     {
 132       psr[i].hsemCur = (HSEM) pHandles[i]->hevDone;
 133       psr[i].ulUser  = WAIT_OBJECT_0 + i;
 134     }
 135 
 136   if (DosCreateMuxWaitSem (NULL, &hmux, nCount, psr,
 137                            bWaitAll ? DCMW_WAIT_ALL : DCMW_WAIT_ANY))
 138     goto exit_free;
 139 
 140   rc = DosWaitMuxWaitSem (hmux, ms, &ulUser);
 141   DosCloseMuxWaitSem (hmux);
 142 
 143  exit_free:
 144   free (psr);
 145 
 146  exit_return:
 147   if (rc)
 148     return (DWORD) -1;
 149 
 150   return ulUser;
 151 }
 152 #else
 153 # include <signal.h>
 154 # include <sys/select.h>
 155 #endif
 156 
 157 #include "error.h"
 158 #include "spawn-pipe.h"
 159 #include "wait-process.h"
 160 #include "gettext.h"
 161 
 162 #define _(str) gettext (str)
 163 
 164 #include "pipe-filter-aux.h"
 165 
 166 #if (defined _WIN32 && ! defined __CYGWIN__) || defined __KLIBC__
 167 
 168 struct locals
 169 {
 170   /* Arguments passed to pipe_filter_ii_execute.  */
 171   prepare_write_fn prepare_write;
 172   done_write_fn done_write;
 173   prepare_read_fn prepare_read;
 174   done_read_fn done_read;
 175 
 176   /* Management of the subprocess.  */
 177   void *private_data;
 178   int fd[2];
 179 
 180   /* Status of the writer part.  */
 181   volatile bool writer_terminated;
 182   volatile int writer_errno;
 183   /* Status of the reader part.  */
 184   volatile bool reader_terminated;
 185   volatile int reader_errno;
 186 };
 187 
 188 static unsigned int WINAPI
 189 writer_thread_func (void *thread_arg)
     /* [previous][next][first][last][top][bottom][index][help] */
 190 {
 191   struct locals *l = (struct locals *) thread_arg;
 192 
 193   for (;;)
 194     {
 195       size_t bufsize;
 196       const void *buf = l->prepare_write (&bufsize, l->private_data);
 197       if (buf != NULL)
 198         {
 199           ssize_t nwritten =
 200             write (l->fd[1], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
 201           if (nwritten < 0)
 202             {
 203               /* Don't assume that the gnulib modules 'write' and 'sigpipe' are
 204                  used.  */
 205               if (GetLastError () == ERROR_NO_DATA)
 206                 errno = EPIPE;
 207               l->writer_errno = errno;
 208               break;
 209             }
 210           else if (nwritten > 0)
 211             l->done_write ((void *) buf, nwritten, l->private_data);
 212         }
 213       else
 214         break;
 215     }
 216 
 217   l->writer_terminated = true;
 218   _endthreadex (0); /* calls ExitThread (0) */
 219   abort ();
 220 }
 221 
 222 static unsigned int WINAPI
 223 reader_thread_func (void *thread_arg)
     /* [previous][next][first][last][top][bottom][index][help] */
 224 {
 225   struct locals *l = (struct locals *) thread_arg;
 226 
 227   for (;;)
 228     {
 229       size_t bufsize;
 230       void *buf = l->prepare_read (&bufsize, l->private_data);
 231       if (!(buf != NULL && bufsize > 0))
 232         /* prepare_read returned wrong values.  */
 233         abort ();
 234       {
 235         ssize_t nread =
 236           read (l->fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
 237         if (nread < 0)
 238           {
 239             l->reader_errno = errno;
 240             break;
 241           }
 242         else if (nread > 0)
 243           l->done_read (buf, nread, l->private_data);
 244         else /* nread == 0 */
 245           break;
 246       }
 247     }
 248 
 249   l->reader_terminated = true;
 250   _endthreadex (0); /* calls ExitThread (0) */
 251   abort ();
 252 }
 253 
 254 #endif
 255 
 256 int
 257 pipe_filter_ii_execute (const char *progname,
     /* [previous][next][first][last][top][bottom][index][help] */
 258                         const char *prog_path, const char * const *prog_argv,
 259                         bool null_stderr, bool exit_on_error,
 260                         prepare_write_fn prepare_write,
 261                         done_write_fn done_write,
 262                         prepare_read_fn prepare_read,
 263                         done_read_fn done_read,
 264                         void *private_data)
 265 {
 266   pid_t child;
 267   int fd[2];
 268 #if !((defined _WIN32 && ! defined __CYGWIN__) || defined __KLIBC__)
 269   struct sigaction orig_sigpipe_action;
 270 #endif
 271 
 272   /* Open a bidirectional pipe to a subprocess.  */
 273   child = create_pipe_bidi (progname, prog_path, prog_argv,
 274                             NULL, null_stderr, true, exit_on_error,
 275                             fd);
 276   if (child == -1)
 277     return -1;
 278 
 279 #if (defined _WIN32 && ! defined __CYGWIN__) || defined __KLIBC__
 280   /* Native Windows API.  */
 281   /* Pipes have a non-blocking mode, see function SetNamedPipeHandleState and
 282      the article "Named Pipe Type, Read, and Wait Modes", but Microsoft's
 283      documentation discourages its use.  So don't use it.
 284      Asynchronous I/O is also not suitable because it notifies the caller only
 285      about completion of the I/O request, not about intermediate progress.
 286      So do the writing and the reading in separate threads.  */
 287   {
 288     struct locals l;
 289     HANDLE handles[2];
 290     #define writer_thread_handle handles[0]
 291     #define reader_thread_handle handles[1]
 292     bool writer_cleaned_up;
 293     bool reader_cleaned_up;
 294 
 295     l.prepare_write = prepare_write;
 296     l.done_write = done_write;
 297     l.prepare_read = prepare_read;
 298     l.done_read = done_read;
 299     l.private_data = private_data;
 300     l.fd[0] = fd[0];
 301     l.fd[1] = fd[1];
 302     l.writer_terminated = false;
 303     l.writer_errno = 0;
 304     l.reader_terminated = false;
 305     l.reader_errno = 0;
 306 
 307     writer_thread_handle =
 308       (HANDLE) _beginthreadex (NULL, 100000, writer_thread_func, &l, 0, NULL);
 309     reader_thread_handle =
 310       (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, &l, 0, NULL);
 311     if (writer_thread_handle == NULL || reader_thread_handle == NULL)
 312       {
 313         if (exit_on_error)
 314           error (EXIT_FAILURE, 0, _("creation of threads failed"));
 315         if (reader_thread_handle != NULL)
 316           CloseHandle (reader_thread_handle);
 317         if (writer_thread_handle != NULL)
 318           CloseHandle (writer_thread_handle);
 319         goto fail;
 320       }
 321     writer_cleaned_up = false;
 322     reader_cleaned_up = false;
 323     for (;;)
 324       {
 325         DWORD ret;
 326 
 327         /* Here !(writer_cleaned_up && reader_cleaned_up).  */
 328         if (writer_cleaned_up)
 329           ret = WaitForSingleObject (reader_thread_handle, INFINITE);
 330         else if (reader_cleaned_up)
 331           ret = WaitForSingleObject (writer_thread_handle, INFINITE);
 332         else
 333           ret = WaitForMultipleObjects (2, handles, FALSE, INFINITE);
 334         if (!(ret == WAIT_OBJECT_0 + 0 || ret == WAIT_OBJECT_0 + 1))
 335           abort ();
 336 
 337         if (l.writer_terminated)
 338           {
 339             /* The writer thread has just terminated.  */
 340             l.writer_terminated = false;
 341             CloseHandle (writer_thread_handle);
 342             if (l.writer_errno)
 343               {
 344                 if (exit_on_error)
 345                   error (EXIT_FAILURE, l.writer_errno,
 346                          _("write to %s subprocess failed"), progname);
 347                 if (!reader_cleaned_up)
 348                   {
 349                     TerminateThread (reader_thread_handle, 1);
 350                     CloseHandle (reader_thread_handle);
 351                   }
 352                 goto fail;
 353               }
 354             /* Tell the child there is nothing more the parent will send.  */
 355             close (fd[1]);
 356             writer_cleaned_up = true;
 357           }
 358         if (l.reader_terminated)
 359           {
 360             /* The reader thread has just terminated.  */
 361             l.reader_terminated = false;
 362             CloseHandle (reader_thread_handle);
 363             if (l.reader_errno)
 364               {
 365                 if (exit_on_error)
 366                   error (EXIT_FAILURE, l.reader_errno,
 367                          _("read from %s subprocess failed"), progname);
 368                 if (!writer_cleaned_up)
 369                   {
 370                     TerminateThread (writer_thread_handle, 1);
 371                     CloseHandle (writer_thread_handle);
 372                   }
 373                 goto fail;
 374               }
 375             reader_cleaned_up = true;
 376           }
 377         if (writer_cleaned_up && reader_cleaned_up)
 378           break;
 379       }
 380   }
 381 #else
 382   /* When we write to the child process and it has just terminated,
 383      we don't want to die from a SIGPIPE signal.  So set the SIGPIPE
 384      handler to SIG_IGN, and handle EPIPE error codes in write().  */
 385   {
 386     struct sigaction sigpipe_action;
 387 
 388     sigpipe_action.sa_handler = SIG_IGN;
 389     sigpipe_action.sa_flags = 0;
 390     sigemptyset (&sigpipe_action.sa_mask);
 391     if (sigaction (SIGPIPE, &sigpipe_action, &orig_sigpipe_action) < 0)
 392       abort ();
 393   }
 394 
 395   {
 396 # if HAVE_SELECT
 397     fd_set readfds;  /* All bits except fd[0] are always cleared.  */
 398     fd_set writefds; /* All bits except fd[1] are always cleared.  */
 399 # endif
 400     bool done_writing;
 401 
 402     /* Enable non-blocking I/O.  This permits the read() and write() calls
 403        to return -1/EAGAIN without blocking; this is important for polling
 404        if HAVE_SELECT is not defined.  It also permits the read() and write()
 405        calls to return after partial reads/writes; this is important if
 406        HAVE_SELECT is defined, because select() only says that some data
 407        can be read or written, not how many.  Without non-blocking I/O,
 408        Linux 2.2.17 and BSD systems prefer to block instead of returning
 409        with partial results.  */
 410     {
 411       int fcntl_flags;
 412 
 413       if ((fcntl_flags = fcntl (fd[1], F_GETFL, 0)) < 0
 414           || fcntl (fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) == -1
 415           || (fcntl_flags = fcntl (fd[0], F_GETFL, 0)) < 0
 416           || fcntl (fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) == -1)
 417         {
 418           if (exit_on_error)
 419             error (EXIT_FAILURE, errno,
 420                    _("cannot set up nonblocking I/O to %s subprocess"),
 421                    progname);
 422           goto fail;
 423         }
 424     }
 425 
 426 # if HAVE_SELECT
 427     FD_ZERO (&readfds);
 428     FD_ZERO (&writefds);
 429 # endif
 430     done_writing = false;
 431     for (;;)
 432       {
 433 # if HAVE_SELECT
 434         int n, retval;
 435 
 436         FD_SET (fd[0], &readfds);
 437         n = fd[0] + 1;
 438         if (!done_writing)
 439           {
 440             FD_SET (fd[1], &writefds);
 441             if (n <= fd[1])
 442               n = fd[1] + 1;
 443           }
 444 
 445         /* Do EINTR handling here instead of in pipe-filter-aux.h,
 446            because select() cannot be referred to from an inline
 447            function on AIX 7.1.  */
 448         do
 449           retval = select (n, &readfds, (!done_writing ? &writefds : NULL),
 450                            NULL, NULL);
 451         while (retval < 0 && errno == EINTR);
 452         n = retval;
 453 
 454         if (n < 0)
 455           {
 456             if (exit_on_error)
 457               error (EXIT_FAILURE, errno,
 458                      _("communication with %s subprocess failed"), progname);
 459             goto fail;
 460           }
 461         if (!done_writing && FD_ISSET (fd[1], &writefds))
 462           goto try_write;
 463         if (FD_ISSET (fd[0], &readfds))
 464           goto try_read;
 465         /* How could select() return if none of the two descriptors is ready?  */
 466         abort ();
 467 # endif
 468 
 469         /* Attempt to write.  */
 470 # if HAVE_SELECT
 471       try_write:
 472 # endif
 473         if (!done_writing)
 474           {
 475             size_t bufsize;
 476             const void *buf = prepare_write (&bufsize, private_data);
 477             if (buf != NULL)
 478               {
 479                 /* Writing to a pipe in non-blocking mode is tricky: The
 480                    write() call may fail with EAGAIN, simply because sufficient
 481                    space is not available in the pipe. See POSIX:2008
 482                    <https://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html>.
 483                    This happens actually on AIX and IRIX, when bufsize >= 8192
 484                    (even though PIPE_BUF and pathconf ("/", _PC_PIPE_BUF) are
 485                    both 32768).  */
 486                 size_t attempt_to_write =
 487                   (bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
 488                 for (;;)
 489                   {
 490                     ssize_t nwritten = write (fd[1], buf, attempt_to_write);
 491                     if (nwritten < 0)
 492                       {
 493                         if (errno == EAGAIN)
 494                           {
 495                             attempt_to_write = attempt_to_write / 2;
 496                             if (attempt_to_write == 0)
 497                               break;
 498                           }
 499                         else if (!IS_EAGAIN (errno))
 500                           {
 501                             if (exit_on_error)
 502                               error (EXIT_FAILURE, errno,
 503                                      _("write to %s subprocess failed"),
 504                                      progname);
 505                             goto fail;
 506                           }
 507                       }
 508                     else
 509                       {
 510                         if (nwritten > 0)
 511                           done_write ((void *) buf, nwritten, private_data);
 512                         break;
 513                       }
 514                   }
 515               }
 516             else
 517               {
 518                 /* Tell the child there is nothing more the parent will send.  */
 519                 close (fd[1]);
 520                 done_writing = true;
 521               }
 522           }
 523 # if HAVE_SELECT
 524         continue;
 525 # endif
 526 
 527         /* Attempt to read.  */
 528 # if HAVE_SELECT
 529       try_read:
 530 # endif
 531         {
 532           size_t bufsize;
 533           void *buf = prepare_read (&bufsize, private_data);
 534           if (!(buf != NULL && bufsize > 0))
 535             /* prepare_read returned wrong values.  */
 536             abort ();
 537           {
 538             ssize_t nread =
 539               read (fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
 540             if (nread < 0)
 541               {
 542                 if (!IS_EAGAIN (errno))
 543                   {
 544                     if (exit_on_error)
 545                       error (EXIT_FAILURE, errno,
 546                              _("read from %s subprocess failed"), progname);
 547                     goto fail;
 548                   }
 549               }
 550             else if (nread > 0)
 551               done_read (buf, nread, private_data);
 552             else /* nread == 0 */
 553               {
 554                 if (done_writing)
 555                   break;
 556               }
 557           }
 558         }
 559 # if HAVE_SELECT
 560         continue;
 561 # endif
 562       }
 563   }
 564 
 565   /* Restore SIGPIPE signal handler.  */
 566   if (sigaction (SIGPIPE, &orig_sigpipe_action, NULL) < 0)
 567     abort ();
 568 #endif
 569 
 570   close (fd[0]);
 571 
 572   /* Remove zombie process from process list.  */
 573   {
 574     int exitstatus =
 575       wait_subprocess (child, progname, false, null_stderr,
 576                        true, exit_on_error, NULL);
 577     if (exitstatus != 0 && exit_on_error)
 578       error (EXIT_FAILURE, 0, _("%s subprocess terminated with exit code %d"),
 579              progname, exitstatus);
 580     return exitstatus;
 581   }
 582 
 583  fail:
 584   {
 585     int saved_errno = errno;
 586     close (fd[1]);
 587 #if !((defined _WIN32 && ! defined __CYGWIN__) || defined __KLIBC__)
 588     if (sigaction (SIGPIPE, &orig_sigpipe_action, NULL) < 0)
 589       abort ();
 590 #endif
 591     close (fd[0]);
 592     wait_subprocess (child, progname, true, true, true, false, NULL);
 593     errno = saved_errno;
 594     return -1;
 595   }
 596 }

/* [previous][next][first][last][top][bottom][index][help] */