This source file includes following definitions.
- start_wrapper
- _beginthreadex
- CloseHandle
- WaitForSingleObject
- WaitForMultipleObjects
- writer_thread_func
- reader_thread_func
- pipe_filter_ii_execute
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include <config.h>
19
20 #include "pipe-filter.h"
21
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <stdbool.h>
25 #include <stdint.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #if defined _WIN32 && ! defined __CYGWIN__
29 # include <windows.h>
30 # include <process.h>
31 #elif defined __KLIBC__
32 # define INCL_DOS
33 # include <os2.h>
34
35
36
37 # define WINAPI
38
39 typedef struct _HANDLE
40 {
41 TID tid;
42 HEV hevDone;
43 unsigned int WINAPI (*start) (void *);
44 void *arg;
45 } *HANDLE;
46
47 typedef ULONG DWORD;
48
49 static void
50 start_wrapper (void *arg)
51 {
52 HANDLE h = (HANDLE) arg;
53
54 h->start (h->arg);
55
56 DosPostEventSem (h->hevDone);
57 _endthread ();
58 }
59
60 static HANDLE
61 _beginthreadex (void *s, unsigned n, unsigned int WINAPI (*start) (void *),
62 void *arg, unsigned fl, unsigned *th)
63 {
64 HANDLE h;
65
66 h = malloc (sizeof (*h));
67 if (!h)
68 return NULL;
69
70 if (DosCreateEventSem (NULL, &h->hevDone, 0, FALSE))
71 goto exit_free;
72
73 h->start = start;
74 h->arg = arg;
75
76 h->tid = _beginthread (start_wrapper, NULL, n, (void *) h);
77 if (h->tid == -1)
78 goto exit_close_event_sem;
79
80 return h;
81
82 exit_close_event_sem:
83 DosCloseEventSem (h->hevDone);
84
85 exit_free:
86 free (h);
87
88 return NULL;
89 }
90
91 static BOOL
92 CloseHandle (HANDLE h)
93 {
94 DosCloseEventSem (h->hevDone);
95 free (h);
96 }
97
98 # define _endthreadex(x) return (x)
99 # define TerminateThread(h, e) DosKillThread (h->tid)
100
101 # define GetLastError() -1
102
103 # ifndef ERROR_NO_DATA
104 # define ERROR_NO_DATA 232
105 # endif
106
107 # define INFINITE SEM_INDEFINITE_WAIT
108 # define WAIT_OBJECT_0 0
109
110 static DWORD
111 WaitForSingleObject (HANDLE h, DWORD ms)
112 {
113 return DosWaitEventSem (h->hevDone, ms) == 0 ? WAIT_OBJECT_0 : (DWORD) -1;
114 }
115
116 static DWORD
117 WaitForMultipleObjects (DWORD nCount, const HANDLE *pHandles, BOOL bWaitAll,
118 DWORD ms)
119 {
120 HMUX hmux;
121 PSEMRECORD psr;
122 ULONG ulUser;
123 ULONG rc = (ULONG) -1;
124 DWORD i;
125
126 psr = malloc (sizeof (*psr) * nCount);
127 if (!psr)
128 goto exit_return;
129
130 for (i = 0; i < nCount; ++i)
131 {
132 psr[i].hsemCur = (HSEM) pHandles[i]->hevDone;
133 psr[i].ulUser = WAIT_OBJECT_0 + i;
134 }
135
136 if (DosCreateMuxWaitSem (NULL, &hmux, nCount, psr,
137 bWaitAll ? DCMW_WAIT_ALL : DCMW_WAIT_ANY))
138 goto exit_free;
139
140 rc = DosWaitMuxWaitSem (hmux, ms, &ulUser);
141 DosCloseMuxWaitSem (hmux);
142
143 exit_free:
144 free (psr);
145
146 exit_return:
147 if (rc)
148 return (DWORD) -1;
149
150 return ulUser;
151 }
152 #else
153 # include <signal.h>
154 # include <sys/select.h>
155 #endif
156
157 #include "error.h"
158 #include "spawn-pipe.h"
159 #include "wait-process.h"
160 #include "gettext.h"
161
162 #define _(str) gettext (str)
163
164 #include "pipe-filter-aux.h"
165
166 #if (defined _WIN32 && ! defined __CYGWIN__) || defined __KLIBC__
167
168 struct locals
169 {
170
171 prepare_write_fn prepare_write;
172 done_write_fn done_write;
173 prepare_read_fn prepare_read;
174 done_read_fn done_read;
175
176
177 void *private_data;
178 int fd[2];
179
180
181 volatile bool writer_terminated;
182 volatile int writer_errno;
183
184 volatile bool reader_terminated;
185 volatile int reader_errno;
186 };
187
188 static unsigned int WINAPI
189 writer_thread_func (void *thread_arg)
190 {
191 struct locals *l = (struct locals *) thread_arg;
192
193 for (;;)
194 {
195 size_t bufsize;
196 const void *buf = l->prepare_write (&bufsize, l->private_data);
197 if (buf != NULL)
198 {
199 ssize_t nwritten =
200 write (l->fd[1], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
201 if (nwritten < 0)
202 {
203
204
205 if (GetLastError () == ERROR_NO_DATA)
206 errno = EPIPE;
207 l->writer_errno = errno;
208 break;
209 }
210 else if (nwritten > 0)
211 l->done_write ((void *) buf, nwritten, l->private_data);
212 }
213 else
214 break;
215 }
216
217 l->writer_terminated = true;
218 _endthreadex (0);
219 abort ();
220 }
221
222 static unsigned int WINAPI
223 reader_thread_func (void *thread_arg)
224 {
225 struct locals *l = (struct locals *) thread_arg;
226
227 for (;;)
228 {
229 size_t bufsize;
230 void *buf = l->prepare_read (&bufsize, l->private_data);
231 if (!(buf != NULL && bufsize > 0))
232
233 abort ();
234 {
235 ssize_t nread =
236 read (l->fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
237 if (nread < 0)
238 {
239 l->reader_errno = errno;
240 break;
241 }
242 else if (nread > 0)
243 l->done_read (buf, nread, l->private_data);
244 else
245 break;
246 }
247 }
248
249 l->reader_terminated = true;
250 _endthreadex (0);
251 abort ();
252 }
253
254 #endif
255
256 int
257 pipe_filter_ii_execute (const char *progname,
258 const char *prog_path, const char * const *prog_argv,
259 bool null_stderr, bool exit_on_error,
260 prepare_write_fn prepare_write,
261 done_write_fn done_write,
262 prepare_read_fn prepare_read,
263 done_read_fn done_read,
264 void *private_data)
265 {
266 pid_t child;
267 int fd[2];
268 #if !((defined _WIN32 && ! defined __CYGWIN__) || defined __KLIBC__)
269 struct sigaction orig_sigpipe_action;
270 #endif
271
272
273 child = create_pipe_bidi (progname, prog_path, prog_argv,
274 NULL, null_stderr, true, exit_on_error,
275 fd);
276 if (child == -1)
277 return -1;
278
279 #if (defined _WIN32 && ! defined __CYGWIN__) || defined __KLIBC__
280
281
282
283
284
285
286
287 {
288 struct locals l;
289 HANDLE handles[2];
290 #define writer_thread_handle handles[0]
291 #define reader_thread_handle handles[1]
292 bool writer_cleaned_up;
293 bool reader_cleaned_up;
294
295 l.prepare_write = prepare_write;
296 l.done_write = done_write;
297 l.prepare_read = prepare_read;
298 l.done_read = done_read;
299 l.private_data = private_data;
300 l.fd[0] = fd[0];
301 l.fd[1] = fd[1];
302 l.writer_terminated = false;
303 l.writer_errno = 0;
304 l.reader_terminated = false;
305 l.reader_errno = 0;
306
307 writer_thread_handle =
308 (HANDLE) _beginthreadex (NULL, 100000, writer_thread_func, &l, 0, NULL);
309 reader_thread_handle =
310 (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, &l, 0, NULL);
311 if (writer_thread_handle == NULL || reader_thread_handle == NULL)
312 {
313 if (exit_on_error)
314 error (EXIT_FAILURE, 0, _("creation of threads failed"));
315 if (reader_thread_handle != NULL)
316 CloseHandle (reader_thread_handle);
317 if (writer_thread_handle != NULL)
318 CloseHandle (writer_thread_handle);
319 goto fail;
320 }
321 writer_cleaned_up = false;
322 reader_cleaned_up = false;
323 for (;;)
324 {
325 DWORD ret;
326
327
328 if (writer_cleaned_up)
329 ret = WaitForSingleObject (reader_thread_handle, INFINITE);
330 else if (reader_cleaned_up)
331 ret = WaitForSingleObject (writer_thread_handle, INFINITE);
332 else
333 ret = WaitForMultipleObjects (2, handles, FALSE, INFINITE);
334 if (!(ret == WAIT_OBJECT_0 + 0 || ret == WAIT_OBJECT_0 + 1))
335 abort ();
336
337 if (l.writer_terminated)
338 {
339
340 l.writer_terminated = false;
341 CloseHandle (writer_thread_handle);
342 if (l.writer_errno)
343 {
344 if (exit_on_error)
345 error (EXIT_FAILURE, l.writer_errno,
346 _("write to %s subprocess failed"), progname);
347 if (!reader_cleaned_up)
348 {
349 TerminateThread (reader_thread_handle, 1);
350 CloseHandle (reader_thread_handle);
351 }
352 goto fail;
353 }
354
355 close (fd[1]);
356 writer_cleaned_up = true;
357 }
358 if (l.reader_terminated)
359 {
360
361 l.reader_terminated = false;
362 CloseHandle (reader_thread_handle);
363 if (l.reader_errno)
364 {
365 if (exit_on_error)
366 error (EXIT_FAILURE, l.reader_errno,
367 _("read from %s subprocess failed"), progname);
368 if (!writer_cleaned_up)
369 {
370 TerminateThread (writer_thread_handle, 1);
371 CloseHandle (writer_thread_handle);
372 }
373 goto fail;
374 }
375 reader_cleaned_up = true;
376 }
377 if (writer_cleaned_up && reader_cleaned_up)
378 break;
379 }
380 }
381 #else
382
383
384
385 {
386 struct sigaction sigpipe_action;
387
388 sigpipe_action.sa_handler = SIG_IGN;
389 sigpipe_action.sa_flags = 0;
390 sigemptyset (&sigpipe_action.sa_mask);
391 if (sigaction (SIGPIPE, &sigpipe_action, &orig_sigpipe_action) < 0)
392 abort ();
393 }
394
395 {
396 # if HAVE_SELECT
397 fd_set readfds;
398 fd_set writefds;
399 # endif
400 bool done_writing;
401
402
403
404
405
406
407
408
409
410 {
411 int fcntl_flags;
412
413 if ((fcntl_flags = fcntl (fd[1], F_GETFL, 0)) < 0
414 || fcntl (fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) == -1
415 || (fcntl_flags = fcntl (fd[0], F_GETFL, 0)) < 0
416 || fcntl (fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) == -1)
417 {
418 if (exit_on_error)
419 error (EXIT_FAILURE, errno,
420 _("cannot set up nonblocking I/O to %s subprocess"),
421 progname);
422 goto fail;
423 }
424 }
425
426 # if HAVE_SELECT
427 FD_ZERO (&readfds);
428 FD_ZERO (&writefds);
429 # endif
430 done_writing = false;
431 for (;;)
432 {
433 # if HAVE_SELECT
434 int n, retval;
435
436 FD_SET (fd[0], &readfds);
437 n = fd[0] + 1;
438 if (!done_writing)
439 {
440 FD_SET (fd[1], &writefds);
441 if (n <= fd[1])
442 n = fd[1] + 1;
443 }
444
445
446
447
448 do
449 retval = select (n, &readfds, (!done_writing ? &writefds : NULL),
450 NULL, NULL);
451 while (retval < 0 && errno == EINTR);
452 n = retval;
453
454 if (n < 0)
455 {
456 if (exit_on_error)
457 error (EXIT_FAILURE, errno,
458 _("communication with %s subprocess failed"), progname);
459 goto fail;
460 }
461 if (!done_writing && FD_ISSET (fd[1], &writefds))
462 goto try_write;
463 if (FD_ISSET (fd[0], &readfds))
464 goto try_read;
465
466 abort ();
467 # endif
468
469
470 # if HAVE_SELECT
471 try_write:
472 # endif
473 if (!done_writing)
474 {
475 size_t bufsize;
476 const void *buf = prepare_write (&bufsize, private_data);
477 if (buf != NULL)
478 {
479
480
481
482
483
484
485
486 size_t attempt_to_write =
487 (bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
488 for (;;)
489 {
490 ssize_t nwritten = write (fd[1], buf, attempt_to_write);
491 if (nwritten < 0)
492 {
493 if (errno == EAGAIN)
494 {
495 attempt_to_write = attempt_to_write / 2;
496 if (attempt_to_write == 0)
497 break;
498 }
499 else if (!IS_EAGAIN (errno))
500 {
501 if (exit_on_error)
502 error (EXIT_FAILURE, errno,
503 _("write to %s subprocess failed"),
504 progname);
505 goto fail;
506 }
507 }
508 else
509 {
510 if (nwritten > 0)
511 done_write ((void *) buf, nwritten, private_data);
512 break;
513 }
514 }
515 }
516 else
517 {
518
519 close (fd[1]);
520 done_writing = true;
521 }
522 }
523 # if HAVE_SELECT
524 continue;
525 # endif
526
527
528 # if HAVE_SELECT
529 try_read:
530 # endif
531 {
532 size_t bufsize;
533 void *buf = prepare_read (&bufsize, private_data);
534 if (!(buf != NULL && bufsize > 0))
535
536 abort ();
537 {
538 ssize_t nread =
539 read (fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
540 if (nread < 0)
541 {
542 if (!IS_EAGAIN (errno))
543 {
544 if (exit_on_error)
545 error (EXIT_FAILURE, errno,
546 _("read from %s subprocess failed"), progname);
547 goto fail;
548 }
549 }
550 else if (nread > 0)
551 done_read (buf, nread, private_data);
552 else
553 {
554 if (done_writing)
555 break;
556 }
557 }
558 }
559 # if HAVE_SELECT
560 continue;
561 # endif
562 }
563 }
564
565
566 if (sigaction (SIGPIPE, &orig_sigpipe_action, NULL) < 0)
567 abort ();
568 #endif
569
570 close (fd[0]);
571
572
573 {
574 int exitstatus =
575 wait_subprocess (child, progname, false, null_stderr,
576 true, exit_on_error, NULL);
577 if (exitstatus != 0 && exit_on_error)
578 error (EXIT_FAILURE, 0, _("%s subprocess terminated with exit code %d"),
579 progname, exitstatus);
580 return exitstatus;
581 }
582
583 fail:
584 {
585 int saved_errno = errno;
586 close (fd[1]);
587 #if !((defined _WIN32 && ! defined __CYGWIN__) || defined __KLIBC__)
588 if (sigaction (SIGPIPE, &orig_sigpipe_action, NULL) < 0)
589 abort ();
590 #endif
591 close (fd[0]);
592 wait_subprocess (child, progname, true, true, true, false, NULL);
593 errno = saved_errno;
594 return -1;
595 }
596 }