This source file includes following definitions.
- reader_thread_func
- filter_init
- filter_loop
- filter_cleanup
- filter_init
- filter_loop
- filter_cleanup
- filter_terminate
- filter_retcode
- pipe_filter_gi_create
- pipe_filter_gi_write
- pipe_filter_gi_close
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include <config.h>
20
21 #include "pipe-filter.h"
22
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdbool.h>
26 #include <stdint.h>
27 #include <stdlib.h>
28 #include <unistd.h>
29 #if defined _WIN32 && ! defined __CYGWIN__
30 # include <windows.h>
31 # include <process.h>
32 #else
33 # include <signal.h>
34 # include <sys/select.h>
35 #endif
36
37 #include "error.h"
38 #include "spawn-pipe.h"
39 #include "wait-process.h"
40 #include "xalloc.h"
41 #include "gettext.h"
42
43 #define _(str) gettext (str)
44
45 #include "pipe-filter-aux.h"
46
47 struct pipe_filter_gi
48 {
49
50 const char *progname;
51 bool null_stderr;
52 bool exit_on_error;
53 prepare_read_fn prepare_read;
54 done_read_fn done_read;
55 void *private_data;
56
57
58 pid_t child;
59 int fd[2];
60 bool exited;
61 int exitstatus;
62
63
64 volatile bool writer_terminated;
65 int writer_errno;
66
67 volatile bool reader_terminated;
68 volatile int reader_errno;
69
70 #if defined _WIN32 && ! defined __CYGWIN__
71 CRITICAL_SECTION lock;
72 HANDLE reader_thread_handle;
73 #else
74 struct sigaction orig_sigpipe_action;
75 fd_set readfds;
76 fd_set writefds;
77 #endif
78 };
79
80
81
82
83
84
85 static int filter_init (struct pipe_filter_gi *filter);
86
87
88
89 static void filter_loop (struct pipe_filter_gi *filter,
90 const char *wbuf, size_t count);
91
92
93
94
95 static void filter_cleanup (struct pipe_filter_gi *filter,
96 bool finish_reading);
97
98
99 #if defined _WIN32 && ! defined __CYGWIN__
100
101
102 static unsigned int WINAPI
103 reader_thread_func (void *thread_arg)
104 {
105 struct pipe_filter_gi *filter = (struct pipe_filter_gi *) thread_arg;
106
107 for (;;)
108 {
109 size_t bufsize;
110 void *buf = filter->prepare_read (&bufsize, filter->private_data);
111 if (!(buf != NULL && bufsize > 0))
112
113 abort ();
114 {
115 ssize_t nread =
116 read (filter->fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
117 EnterCriticalSection (&filter->lock);
118
119 if (filter->writer_terminated)
120 break;
121 if (nread < 0)
122 {
123 filter->reader_errno = errno;
124 break;
125 }
126 else if (nread > 0)
127 filter->done_read (buf, nread, filter->private_data);
128 else
129 break;
130 LeaveCriticalSection (&filter->lock);
131 }
132 }
133
134 filter->reader_terminated = true;
135 LeaveCriticalSection (&filter->lock);
136 _endthreadex (0);
137 abort ();
138 }
139
140 static int
141 filter_init (struct pipe_filter_gi *filter)
142 {
143 InitializeCriticalSection (&filter->lock);
144 EnterCriticalSection (&filter->lock);
145
146 filter->reader_thread_handle =
147 (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, filter,
148 0, NULL);
149
150 if (filter->reader_thread_handle == NULL)
151 {
152 if (filter->exit_on_error)
153 error (EXIT_FAILURE, 0, _("creation of reading thread failed"));
154 return -1;
155 }
156 else
157 return 0;
158 }
159
160 static void
161 filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count)
162 {
163 if (!filter->writer_terminated)
164 {
165 for (;;)
166 {
167 ssize_t nwritten;
168
169
170 LeaveCriticalSection (&filter->lock);
171
172 nwritten =
173 write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count);
174
175
176 EnterCriticalSection (&filter->lock);
177
178 if (nwritten < 0)
179 {
180
181
182 if (GetLastError () == ERROR_NO_DATA)
183 errno = EPIPE;
184 filter->writer_errno = errno;
185 filter->writer_terminated = true;
186 break;
187 }
188 else if (nwritten > 0)
189 {
190 count -= nwritten;
191 if (count == 0)
192 break;
193 wbuf += nwritten;
194 }
195 else
196 {
197 filter->writer_terminated = true;
198 break;
199 }
200 }
201 }
202 }
203
204 static void
205 filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading)
206 {
207 if (finish_reading)
208 {
209 LeaveCriticalSection (&filter->lock);
210 WaitForSingleObject (filter->reader_thread_handle, INFINITE);
211 }
212 else
213 TerminateThread (filter->reader_thread_handle, 1);
214
215 CloseHandle (filter->reader_thread_handle);
216 DeleteCriticalSection (&filter->lock);
217 }
218
219 #else
220
221
222 static int
223 filter_init (struct pipe_filter_gi *filter)
224 {
225 #if !(defined _WIN32 && ! defined __CYGWIN__)
226
227
228
229 {
230 struct sigaction sigpipe_action;
231
232 sigpipe_action.sa_handler = SIG_IGN;
233 sigpipe_action.sa_flags = 0;
234 sigemptyset (&sigpipe_action.sa_mask);
235 if (sigaction (SIGPIPE, &sigpipe_action, &filter->orig_sigpipe_action) < 0)
236 abort ();
237 }
238 #endif
239
240
241
242
243
244
245
246
247
248 {
249 int fcntl_flags;
250
251 if ((fcntl_flags = fcntl (filter->fd[1], F_GETFL, 0)) < 0
252 || fcntl (filter->fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) == -1
253 || (fcntl_flags = fcntl (filter->fd[0], F_GETFL, 0)) < 0
254 || fcntl (filter->fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) == -1)
255 {
256 if (filter->exit_on_error)
257 error (EXIT_FAILURE, errno,
258 _("cannot set up nonblocking I/O to %s subprocess"),
259 filter->progname);
260 return -1;
261 }
262 }
263
264 FD_ZERO (&filter->readfds);
265 FD_ZERO (&filter->writefds);
266
267 return 0;
268 }
269
270 static void
271 filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count)
272 {
273
274
275
276
277
278
279 bool done_writing = (wbuf == NULL);
280
281 if (!done_writing)
282 {
283 if (filter->writer_terminated || filter->reader_terminated)
284
285 abort ();
286 }
287 else
288 {
289 if (filter->reader_terminated)
290 return;
291 }
292
293
294
295 for (;;)
296 {
297
298
299
300
301
302
303
304
305 # if HAVE_SELECT
306 int n, retval;
307
308
309 n = 1;
310 if (!filter->reader_terminated)
311 {
312 FD_SET (filter->fd[0], &filter->readfds);
313 n = filter->fd[0] + 1;
314 }
315 if (!done_writing)
316 {
317 FD_SET (filter->fd[1], &filter->writefds);
318 if (n <= filter->fd[1])
319 n = filter->fd[1] + 1;
320 }
321
322
323
324 do
325 retval = select (n,
326 (!filter->reader_terminated ? &filter->readfds : NULL),
327 (!done_writing ? &filter->writefds : NULL),
328 NULL, NULL);
329 while (retval < 0 && errno == EINTR);
330 n = retval;
331
332 if (n < 0)
333 {
334 if (filter->exit_on_error)
335 error (EXIT_FAILURE, errno,
336 _("communication with %s subprocess failed"),
337 filter->progname);
338 filter->writer_errno = errno;
339 filter->writer_terminated = true;
340 break;
341 }
342
343 if (!done_writing && FD_ISSET (filter->fd[1], &filter->writefds))
344 goto try_write;
345 if (!filter->reader_terminated
346 && FD_ISSET (filter->fd[0], &filter->readfds))
347 goto try_read;
348
349 abort ();
350 # endif
351
352
353 # if HAVE_SELECT
354 try_write:
355 # endif
356 if (!done_writing)
357 {
358 ssize_t nwritten =
359 write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count);
360 if (nwritten < 0)
361 {
362 if (!IS_EAGAIN (errno))
363 {
364 if (filter->exit_on_error)
365 error (EXIT_FAILURE, errno,
366 _("write to %s subprocess failed"),
367 filter->progname);
368 filter->writer_errno = errno;
369 filter->writer_terminated = true;
370 break;
371 }
372 }
373 else if (nwritten > 0)
374 {
375 count -= nwritten;
376 if (count == 0)
377 break;
378 wbuf += nwritten;
379 }
380 }
381 # if HAVE_SELECT
382 continue;
383 # endif
384
385
386 # if HAVE_SELECT
387 try_read:
388 # endif
389 if (!filter->reader_terminated)
390 {
391 size_t bufsize;
392 void *buf = filter->prepare_read (&bufsize, filter->private_data);
393 if (!(buf != NULL && bufsize > 0))
394
395 abort ();
396 {
397 ssize_t nread =
398 read (filter->fd[0], buf,
399 bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
400 if (nread < 0)
401 {
402 if (!IS_EAGAIN (errno))
403 {
404 if (filter->exit_on_error)
405 error (EXIT_FAILURE, errno,
406 _("read from %s subprocess failed"),
407 filter->progname);
408 filter->reader_errno = errno;
409 filter->reader_terminated = true;
410 break;
411 }
412 }
413 else if (nread > 0)
414 filter->done_read (buf, nread, filter->private_data);
415 else
416 {
417 filter->reader_terminated = true;
418 if (done_writing)
419 break;
420 }
421 }
422 }
423 # if HAVE_SELECT
424 continue;
425 # endif
426 }
427 }
428
429 static void
430 filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading)
431 {
432 if (finish_reading)
433
434 filter_loop (filter, NULL, 0);
435
436 if (sigaction (SIGPIPE, &filter->orig_sigpipe_action, NULL) < 0)
437 abort ();
438 }
439
440 #endif
441
442
443
444 static void
445 filter_terminate (struct pipe_filter_gi *filter)
446 {
447 if (!filter->exited)
448 {
449
450 close (filter->fd[1]);
451 filter_cleanup (filter, !filter->reader_terminated);
452 close (filter->fd[0]);
453 filter->exitstatus =
454 wait_subprocess (filter->child, filter->progname, true,
455 filter->null_stderr, true, filter->exit_on_error,
456 NULL);
457 if (filter->exitstatus != 0 && filter->exit_on_error)
458 error (EXIT_FAILURE, 0,
459 _("subprocess %s terminated with exit code %d"),
460 filter->progname, filter->exitstatus);
461 filter->exited = true;
462 }
463 }
464
465
466
467
468
469 static int
470 filter_retcode (struct pipe_filter_gi *filter)
471 {
472 if (filter->writer_errno != 0)
473 {
474 errno = filter->writer_errno;
475 return -1;
476 }
477 else if (filter->reader_errno != 0)
478 {
479 errno = filter->reader_errno;
480 return -1;
481 }
482 else
483 return filter->exitstatus;
484 }
485
486 struct pipe_filter_gi *
487 pipe_filter_gi_create (const char *progname,
488 const char *prog_path, const char * const *prog_argv,
489 bool null_stderr, bool exit_on_error,
490 prepare_read_fn prepare_read,
491 done_read_fn done_read,
492 void *private_data)
493 {
494 struct pipe_filter_gi *filter;
495
496 filter =
497 (struct pipe_filter_gi *) xmalloc (sizeof (struct pipe_filter_gi));
498
499
500 filter->child = create_pipe_bidi (progname, prog_path, prog_argv,
501 NULL, null_stderr, true, exit_on_error,
502 filter->fd);
503 filter->progname = progname;
504 filter->null_stderr = null_stderr;
505 filter->exit_on_error = exit_on_error;
506 filter->prepare_read = prepare_read;
507 filter->done_read = done_read;
508 filter->private_data = private_data;
509 filter->exited = false;
510 filter->exitstatus = 0;
511 filter->writer_terminated = false;
512 filter->writer_errno = 0;
513 filter->reader_terminated = false;
514 filter->reader_errno = 0;
515
516 if (filter->child == -1)
517 {
518
519
520 filter->writer_errno = errno;
521 filter->writer_terminated = true;
522 filter->exited = true;
523 }
524 else if (filter_init (filter) < 0)
525 filter_terminate (filter);
526
527 return filter;
528 }
529
530 int
531 pipe_filter_gi_write (struct pipe_filter_gi *filter,
532 const void *buf, size_t size)
533 {
534 if (buf == NULL)
535
536 abort ();
537
538 if (filter->exited)
539 return filter_retcode (filter);
540
541 if (size > 0)
542 {
543 filter_loop (filter, buf, size);
544 if (filter->writer_terminated || filter->reader_terminated)
545 {
546 filter_terminate (filter);
547 return filter_retcode (filter);
548 }
549 }
550 return 0;
551 }
552
553 int
554 pipe_filter_gi_close (struct pipe_filter_gi *filter)
555 {
556 int ret;
557
558 filter_terminate (filter);
559 ret = filter_retcode (filter);
560 free (filter);
561 return ret;
562 }