This source file includes following definitions.
- reader_thread_func
 
- filter_init
 
- filter_loop
 
- filter_cleanup
 
- filter_init
 
- filter_loop
 
- filter_cleanup
 
- filter_terminate
 
- filter_retcode
 
- pipe_filter_gi_create
 
- pipe_filter_gi_write
 
- pipe_filter_gi_close
 
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 #include <config.h>
  20 
  21 #include "pipe-filter.h"
  22 
  23 #include <errno.h>
  24 #include <fcntl.h>
  25 #include <stdbool.h>
  26 #include <stdint.h>
  27 #include <stdlib.h>
  28 #include <unistd.h>
  29 #if defined _WIN32 && ! defined __CYGWIN__
  30 # include <windows.h>
  31 # include <process.h> 
  32 #else
  33 # include <signal.h>
  34 # include <sys/select.h>
  35 #endif
  36 
  37 #include "error.h"
  38 #include "spawn-pipe.h"
  39 #include "wait-process.h"
  40 #include "xalloc.h"
  41 #include "gettext.h"
  42 
  43 #define _(str) gettext (str)
  44 
  45 #include "pipe-filter-aux.h"
  46 
  47 struct pipe_filter_gi
  48 {
  49   
  50   const char *progname;
  51   bool null_stderr;
  52   bool exit_on_error;
  53   prepare_read_fn prepare_read;
  54   done_read_fn done_read;
  55   void *private_data;
  56 
  57   
  58   pid_t child;
  59   int fd[2];
  60   bool exited;
  61   int exitstatus;
  62 
  63   
  64   volatile bool writer_terminated;
  65   int writer_errno;
  66   
  67   volatile bool reader_terminated;
  68   volatile int reader_errno;
  69 
  70 #if defined _WIN32 && ! defined __CYGWIN__
  71   CRITICAL_SECTION lock; 
  72   HANDLE reader_thread_handle;
  73 #else
  74   struct sigaction orig_sigpipe_action;
  75   fd_set readfds;  
  76   fd_set writefds; 
  77 #endif
  78 };
  79 
  80 
  81 
  82 
  83 
  84 
  85 static int filter_init (struct pipe_filter_gi *filter);
  86 
  87 
  88 
  89 static void filter_loop (struct pipe_filter_gi *filter,
  90                          const char *wbuf, size_t count);
  91 
  92 
  93 
  94 
  95 static void filter_cleanup (struct pipe_filter_gi *filter,
  96                             bool finish_reading);
  97 
  98 
  99 #if defined _WIN32 && ! defined __CYGWIN__
 100 
 101 
 102 static unsigned int WINAPI
 103 reader_thread_func (void *thread_arg)
     
 104 {
 105   struct pipe_filter_gi *filter = (struct pipe_filter_gi *) thread_arg;
 106 
 107   for (;;)
 108     {
 109       size_t bufsize;
 110       void *buf = filter->prepare_read (&bufsize, filter->private_data);
 111       if (!(buf != NULL && bufsize > 0))
 112         
 113         abort ();
 114       {
 115         ssize_t nread =
 116           read (filter->fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
 117         EnterCriticalSection (&filter->lock);
 118         
 119         if (filter->writer_terminated)
 120           break;
 121         if (nread < 0)
 122           {
 123             filter->reader_errno = errno;
 124             break;
 125           }
 126         else if (nread > 0)
 127           filter->done_read (buf, nread, filter->private_data);
 128         else 
 129           break;
 130         LeaveCriticalSection (&filter->lock);
 131       }
 132     }
 133 
 134   filter->reader_terminated = true;
 135   LeaveCriticalSection (&filter->lock);
 136   _endthreadex (0); 
 137   abort ();
 138 }
 139 
 140 static int
 141 filter_init (struct pipe_filter_gi *filter)
     
 142 {
 143   InitializeCriticalSection (&filter->lock);
 144   EnterCriticalSection (&filter->lock);
 145 
 146   filter->reader_thread_handle =
 147     (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, filter,
 148                              0, NULL);
 149 
 150   if (filter->reader_thread_handle == NULL)
 151     {
 152       if (filter->exit_on_error)
 153         error (EXIT_FAILURE, 0, _("creation of reading thread failed"));
 154       return -1;
 155     }
 156   else
 157     return 0;
 158 }
 159 
 160 static void
 161 filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count)
     
 162 {
 163   if (!filter->writer_terminated)
 164     {
 165       for (;;)
 166         {
 167           ssize_t nwritten;
 168 
 169           
 170           LeaveCriticalSection (&filter->lock);
 171 
 172           nwritten =
 173             write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count);
 174 
 175           
 176           EnterCriticalSection (&filter->lock);
 177 
 178           if (nwritten < 0)
 179             {
 180               
 181 
 182               if (GetLastError () == ERROR_NO_DATA)
 183                 errno = EPIPE;
 184               filter->writer_errno = errno;
 185               filter->writer_terminated = true;
 186               break;
 187             }
 188           else if (nwritten > 0)
 189             {
 190               count -= nwritten;
 191               if (count == 0)
 192                 break;
 193               wbuf += nwritten;
 194             }
 195           else 
 196             {
 197               filter->writer_terminated = true;
 198               break;
 199             }
 200         }
 201     }
 202 }
 203 
 204 static void
 205 filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading)
     
 206 {
 207   if (finish_reading)
 208     {
 209       LeaveCriticalSection (&filter->lock);
 210       WaitForSingleObject (filter->reader_thread_handle, INFINITE);
 211     }
 212   else
 213     TerminateThread (filter->reader_thread_handle, 1);
 214 
 215   CloseHandle (filter->reader_thread_handle);
 216   DeleteCriticalSection (&filter->lock);
 217 }
 218 
 219 #else
 220 
 221 
 222 static int
 223 filter_init (struct pipe_filter_gi *filter)
     
 224 {
 225 #if !(defined _WIN32 && ! defined __CYGWIN__)
 226   
 227 
 228 
 229   {
 230     struct sigaction sigpipe_action;
 231 
 232     sigpipe_action.sa_handler = SIG_IGN;
 233     sigpipe_action.sa_flags = 0;
 234     sigemptyset (&sigpipe_action.sa_mask);
 235     if (sigaction (SIGPIPE, &sigpipe_action, &filter->orig_sigpipe_action) < 0)
 236       abort ();
 237   }
 238 #endif
 239 
 240   
 241 
 242 
 243 
 244 
 245 
 246 
 247 
 248   {
 249     int fcntl_flags;
 250 
 251     if ((fcntl_flags = fcntl (filter->fd[1], F_GETFL, 0)) < 0
 252         || fcntl (filter->fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) == -1
 253         || (fcntl_flags = fcntl (filter->fd[0], F_GETFL, 0)) < 0
 254         || fcntl (filter->fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) == -1)
 255       {
 256         if (filter->exit_on_error)
 257           error (EXIT_FAILURE, errno,
 258                  _("cannot set up nonblocking I/O to %s subprocess"),
 259                  filter->progname);
 260         return -1;
 261       }
 262   }
 263 
 264   FD_ZERO (&filter->readfds);
 265   FD_ZERO (&filter->writefds);
 266 
 267   return 0;
 268 }
 269 
 270 static void
 271 filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count)
     
 272 {
 273   
 274 
 275 
 276 
 277 
 278 
 279   bool done_writing = (wbuf == NULL);
 280 
 281   if (!done_writing)
 282     {
 283       if (filter->writer_terminated || filter->reader_terminated)
 284         
 285         abort ();
 286     }
 287   else
 288     {
 289       if (filter->reader_terminated)
 290         return;
 291     }
 292 
 293   
 294 
 295   for (;;)
 296     {
 297       
 298 
 299       
 300 
 301       
 302 
 303       
 304 
 305 # if HAVE_SELECT
 306       int n, retval;
 307 
 308       
 309       n = 1;
 310       if (!filter->reader_terminated)
 311         {
 312           FD_SET (filter->fd[0], &filter->readfds);
 313           n = filter->fd[0] + 1;
 314         }
 315       if (!done_writing)
 316         {
 317           FD_SET (filter->fd[1], &filter->writefds);
 318           if (n <= filter->fd[1])
 319             n = filter->fd[1] + 1;
 320         }
 321       
 322 
 323 
 324       do
 325         retval = select (n,
 326                          (!filter->reader_terminated ? &filter->readfds : NULL),
 327                          (!done_writing ? &filter->writefds : NULL),
 328                          NULL, NULL);
 329       while (retval < 0 && errno == EINTR);
 330       n = retval;
 331 
 332       if (n < 0)
 333         {
 334           if (filter->exit_on_error)
 335             error (EXIT_FAILURE, errno,
 336                    _("communication with %s subprocess failed"),
 337                    filter->progname);
 338           filter->writer_errno = errno;
 339           filter->writer_terminated = true;
 340           break;
 341         }
 342 
 343       if (!done_writing && FD_ISSET (filter->fd[1], &filter->writefds))
 344         goto try_write;
 345       if (!filter->reader_terminated
 346           && FD_ISSET (filter->fd[0], &filter->readfds))
 347         goto try_read;
 348       
 349       abort ();
 350 # endif
 351 
 352       
 353 # if HAVE_SELECT
 354     try_write:
 355 # endif
 356       if (!done_writing)
 357         {
 358           ssize_t nwritten =
 359             write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count);
 360           if (nwritten < 0)
 361             {
 362               if (!IS_EAGAIN (errno))
 363                 {
 364                   if (filter->exit_on_error)
 365                     error (EXIT_FAILURE, errno,
 366                            _("write to %s subprocess failed"),
 367                            filter->progname);
 368                   filter->writer_errno = errno;
 369                   filter->writer_terminated = true;
 370                   break;
 371                 }
 372             }
 373           else if (nwritten > 0)
 374             {
 375               count -= nwritten;
 376               if (count == 0)
 377                 break;
 378               wbuf += nwritten;
 379             }
 380         }
 381 # if HAVE_SELECT
 382       continue;
 383 # endif
 384 
 385       
 386 # if HAVE_SELECT
 387     try_read:
 388 # endif
 389       if (!filter->reader_terminated)
 390         {
 391           size_t bufsize;
 392           void *buf = filter->prepare_read (&bufsize, filter->private_data);
 393           if (!(buf != NULL && bufsize > 0))
 394             
 395             abort ();
 396           {
 397             ssize_t nread =
 398               read (filter->fd[0], buf,
 399                     bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
 400             if (nread < 0)
 401               {
 402                 if (!IS_EAGAIN (errno))
 403                   {
 404                     if (filter->exit_on_error)
 405                       error (EXIT_FAILURE, errno,
 406                              _("read from %s subprocess failed"),
 407                              filter->progname);
 408                     filter->reader_errno = errno;
 409                     filter->reader_terminated = true;
 410                     break;
 411                   }
 412               }
 413             else if (nread > 0)
 414               filter->done_read (buf, nread, filter->private_data);
 415             else 
 416               {
 417                 filter->reader_terminated = true;
 418                 if (done_writing)
 419                   break;
 420               }
 421           }
 422       }
 423 # if HAVE_SELECT
 424       continue;
 425 # endif
 426     }
 427 }
 428 
 429 static void
 430 filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading)
     
 431 {
 432   if (finish_reading)
 433     
 434     filter_loop (filter, NULL, 0);
 435 
 436   if (sigaction (SIGPIPE, &filter->orig_sigpipe_action, NULL) < 0)
 437     abort ();
 438 }
 439 
 440 #endif
 441 
 442 
 443 
 444 static void
 445 filter_terminate (struct pipe_filter_gi *filter)
     
 446 {
 447   if (!filter->exited)
 448     {
 449       
 450       close (filter->fd[1]);
 451       filter_cleanup (filter, !filter->reader_terminated);
 452       close (filter->fd[0]);
 453       filter->exitstatus =
 454         wait_subprocess (filter->child, filter->progname, true,
 455                          filter->null_stderr, true, filter->exit_on_error,
 456                          NULL);
 457       if (filter->exitstatus != 0 && filter->exit_on_error)
 458         error (EXIT_FAILURE, 0,
 459                _("subprocess %s terminated with exit code %d"),
 460                filter->progname, filter->exitstatus);
 461       filter->exited = true;
 462     }
 463 }
 464 
 465 
 466 
 467 
 468 
 469 static int
 470 filter_retcode (struct pipe_filter_gi *filter)
     
 471 {
 472   if (filter->writer_errno != 0)
 473     {
 474       errno = filter->writer_errno;
 475       return -1;
 476     }
 477   else if (filter->reader_errno != 0)
 478     {
 479       errno = filter->reader_errno;
 480       return -1;
 481     }
 482   else
 483     return filter->exitstatus;
 484 }
 485 
 486 struct pipe_filter_gi *
 487 pipe_filter_gi_create (const char *progname,
     
 488                        const char *prog_path, const char * const *prog_argv,
 489                        bool null_stderr, bool exit_on_error,
 490                        prepare_read_fn prepare_read,
 491                        done_read_fn done_read,
 492                        void *private_data)
 493 {
 494   struct pipe_filter_gi *filter;
 495 
 496   filter =
 497     (struct pipe_filter_gi *) xmalloc (sizeof (struct pipe_filter_gi));
 498 
 499   
 500   filter->child = create_pipe_bidi (progname, prog_path, prog_argv,
 501                                     NULL, null_stderr, true, exit_on_error,
 502                                     filter->fd);
 503   filter->progname = progname;
 504   filter->null_stderr = null_stderr;
 505   filter->exit_on_error = exit_on_error;
 506   filter->prepare_read = prepare_read;
 507   filter->done_read = done_read;
 508   filter->private_data = private_data;
 509   filter->exited = false;
 510   filter->exitstatus = 0;
 511   filter->writer_terminated = false;
 512   filter->writer_errno = 0;
 513   filter->reader_terminated = false;
 514   filter->reader_errno = 0;
 515 
 516   if (filter->child == -1)
 517     {
 518       
 519 
 520       filter->writer_errno = errno;
 521       filter->writer_terminated = true;
 522       filter->exited = true;
 523     }
 524   else if (filter_init (filter) < 0)
 525     filter_terminate (filter);
 526 
 527   return filter;
 528 }
 529 
 530 int
 531 pipe_filter_gi_write (struct pipe_filter_gi *filter,
     
 532                       const void *buf, size_t size)
 533 {
 534   if (buf == NULL)
 535     
 536     abort ();
 537 
 538   if (filter->exited)
 539     return filter_retcode (filter);
 540 
 541   if (size > 0)
 542     {
 543       filter_loop (filter, buf, size);
 544       if (filter->writer_terminated || filter->reader_terminated)
 545         {
 546           filter_terminate (filter);
 547           return filter_retcode (filter);
 548         }
 549     }
 550   return 0;
 551 }
 552 
 553 int
 554 pipe_filter_gi_close (struct pipe_filter_gi *filter)
     
 555 {
 556   int ret;
 557 
 558   filter_terminate (filter);
 559   ret = filter_retcode (filter);
 560   free (filter);
 561   return ret;
 562 }