00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00036 #include "blocxx/BLOCXX_config.h"
00037 #include "blocxx/Exec.hpp"
00038 #include "blocxx/Format.hpp"
00039 #include "blocxx/Assertion.hpp"
00040 #include "blocxx/PosixUnnamedPipe.hpp"
00041 #include "blocxx/Array.hpp"
00042 #include "blocxx/IOException.hpp"
00043 #include "blocxx/Thread.hpp"
00044 #include "blocxx/Select.hpp"
00045 #include "blocxx/ExceptionIds.hpp"
00046 #include "blocxx/IntrusiveCountableBase.hpp"
00047 #include "blocxx/DateTime.hpp"
00048 #include "blocxx/AutoPtr.hpp"
00049
00050 #include <map>
00051
00052 extern "C"
00053 {
00054 #ifdef BLOCXX_HAVE_SYS_RESOURCE_H
00055 #include <sys/resource.h>
00056 #endif
00057 #ifndef BLOCXX_WIN32
00058 #include <unistd.h>
00059 #include <sys/wait.h>
00060 #include <fcntl.h>
00061 #endif
00062 #include <errno.h>
00063 #include <stdio.h>
00064 #include <signal.h>
00065 }
00066
00067 #include <cerrno>
00068 #include <iostream>
00069
00070
00071 #ifndef NSIG
00072 #define NSIG 64
00073 #endif
00074
00075 namespace BLOCXX_NAMESPACE
00076 {
00077
00078 using std::cerr;
00079 using std::endl;
00080 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ExecTimeout);
00081 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ExecBufferFull);
00082 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ExecError);
00083
00084 #ifndef BLOCXX_WIN32
00085 class PopenStreamsImpl : public IntrusiveCountableBase
00086 {
00087 public:
00088 PopenStreamsImpl();
00089 ~PopenStreamsImpl();
00090 UnnamedPipeRef in() const;
00091 void in(const UnnamedPipeRef& pipe);
00092 UnnamedPipeRef out() const;
00093 void out(const UnnamedPipeRef& pipe);
00094 UnnamedPipeRef err() const;
00095 void err(const UnnamedPipeRef& pipe);
00096 Array<UnnamedPipeRef> extraPipes() const;
00097 void setExtraPipes(const Array<UnnamedPipeRef>& pipes);
00098
00099 pid_t pid();
00100 void pid(pid_t newPid);
00101 int getExitStatus();
00102 int getExitStatus(UInt32 wait_initial, UInt32 wait_close, UInt32 wait_term);
00103 void setProcessStatus(int ps)
00104 {
00105 m_processstatus = ps;
00106 }
00107 private:
00108 UnnamedPipeRef m_in;
00109 UnnamedPipeRef m_out;
00110 UnnamedPipeRef m_err;
00111 Array<UnnamedPipeRef> m_extraPipes;
00112 pid_t m_pid;
00113 int m_processstatus;
00114 };
00116 PopenStreamsImpl::PopenStreamsImpl()
00117 : m_pid(-1)
00118 , m_processstatus(-1)
00119 {
00120 }
00122 UnnamedPipeRef PopenStreamsImpl::in() const
00123 {
00124 return m_in;
00125 }
00127 void PopenStreamsImpl::in(const UnnamedPipeRef& pipe)
00128 {
00129 m_in = pipe;
00130 }
00132 UnnamedPipeRef PopenStreamsImpl::out() const
00133 {
00134 return m_out;
00135 }
00137 void PopenStreamsImpl::out(const UnnamedPipeRef& pipe)
00138 {
00139 m_out = pipe;
00140 }
00142 UnnamedPipeRef PopenStreamsImpl::err() const
00143 {
00144 return m_err;
00145 }
00147 void PopenStreamsImpl::err(const UnnamedPipeRef& pipe)
00148 {
00149 m_err = pipe;
00150 }
00152 Array<UnnamedPipeRef> PopenStreamsImpl::extraPipes() const
00153 {
00154 return m_extraPipes;
00155 }
00157 void PopenStreamsImpl::setExtraPipes(const Array<UnnamedPipeRef>& pipes)
00158 {
00159 m_extraPipes = pipes;
00160 }
00162 pid_t PopenStreamsImpl::pid()
00163 {
00164 return m_pid;
00165 }
00167 void PopenStreamsImpl::pid(pid_t newPid)
00168 {
00169 m_pid = newPid;
00170 }
00172 static inline ProcId safeWaitPid(ProcId pid, int* status, int options)
00173 {
00174
00175
00176 int localReturnValue = -1;
00177 pid_t returnedPID = ::waitpid(pid, &localReturnValue, options);
00178 if( returnedPID > 0 )
00179 {
00180 *status = localReturnValue;
00181 }
00182 return returnedPID;
00183 }
00184
00186 static ProcId noIntrWaitPid(ProcId pid, int* status, int options)
00187 {
00188 pid_t waitpidrv;
00189 do
00190 {
00191 Thread::testCancel();
00192 waitpidrv = safeWaitPid(pid, status, options);
00193 } while (waitpidrv == -1 && errno == EINTR);
00194 return waitpidrv;
00195 }
00196
00198 static inline void
00199 milliSleep(UInt32 milliSeconds)
00200 {
00201 Thread::sleep(milliSeconds);
00202 }
00204 static inline void
00205 secSleep(UInt32 seconds)
00206 {
00207 Thread::sleep(seconds * 1000);
00208 }
00210 static bool
00211 timedWaitPid(ProcId pid, int * pstatus, UInt32 wait_time)
00212 {
00213 UInt32 const N = 154;
00214 UInt32 const M = 128;
00215 UInt32 const MAXPERIOD = 5000;
00216 UInt32 period = 100;
00217 UInt32 t = 0;
00218 ProcId waitpidrv = noIntrWaitPid(pid, pstatus, WNOHANG);
00219 while (t < wait_time && waitpidrv == 0) {
00220 milliSleep(period);
00221 t += period;
00222 period *= N;
00223 period /= M;
00224 if (period > MAXPERIOD)
00225 {
00226 period = MAXPERIOD;
00227 }
00228 waitpidrv = noIntrWaitPid(pid, pstatus, WNOHANG);
00229 }
00230 if (waitpidrv < 0) {
00231 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "waitpid() failed.");
00232 }
00233 return waitpidrv != 0;
00234 }
00235
00237
00238
00239
00240 static bool killWait(
00241 ProcId pid, int * pstatus, UInt32 wait_time, int sig, char const * signame
00242 )
00243 {
00244 if (::kill(pid, sig) == -1) {
00245
00246 int errnum = errno;
00247
00248 if (noIntrWaitPid(pid, pstatus, WNOHANG) > 0) {
00249 return true;
00250 }
00251 else {
00252 Format fmt("Failed sending %1 to process %2.", signame, pid);
00253 char const * msg = fmt.c_str();
00254 errno = errnum;
00255 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, msg);
00256 }
00257 }
00258 return timedWaitPid(pid, pstatus, wait_time);
00259 }
00260
00262 int PopenStreamsImpl::getExitStatus()
00263 {
00264 return this->getExitStatus(0, 10 *1000, 10 * 1000);
00265 }
00266
00268 int PopenStreamsImpl::getExitStatus(
00269 UInt32 wait_initial, UInt32 wait_close, UInt32 wait_term)
00270 {
00271 if (m_pid < 0)
00272 {
00273 return m_processstatus;
00274 }
00275 if (m_pid == ::getpid())
00276 {
00277 BLOCXX_THROW(ExecErrorException, "PopenStreamsImpl::getExitStatus: m_pid == getpid()");
00278 }
00279
00280 ProcId pid = m_pid;
00281 m_pid = -1;
00282 int * pstatus = &m_processstatus;
00283
00284
00285 wait_initial *= 1000;
00286 wait_close *= 1000;
00287 wait_term *= 1000;
00288
00289 if (wait_initial > 0 && timedWaitPid(pid, pstatus, wait_initial))
00290 {
00291 return m_processstatus;
00292 }
00293
00294 if (wait_close > 0)
00295 {
00296
00297
00298
00299
00300 UnnamedPipeRef upr;
00301 if (upr = in())
00302 {
00303 upr->close();
00304 }
00305 if (upr = out())
00306 {
00307 upr->close();
00308 }
00309 if (upr = err())
00310 {
00311 upr->close();
00312 }
00313 if (timedWaitPid(pid, pstatus, wait_close))
00314 {
00315 return m_processstatus;
00316 }
00317 }
00318
00319 if (wait_term > 0 && killWait(pid, pstatus, wait_term, SIGTERM, "SIGTERM"))
00320 {
00321 return m_processstatus;
00322 }
00323 if (!killWait(pid, pstatus, 5000, SIGKILL, "SIGKILL")) {
00324 BLOCXX_THROW(
00325 ExecErrorException, "PopenStreamsImpl::getExitStatus: Child process has not exited after sending it a SIGKILL."
00326 );
00327 }
00328 return m_processstatus;
00329 }
00331 PopenStreamsImpl::~PopenStreamsImpl()
00332 {
00333 try
00334 {
00335
00336 getExitStatus();
00337 }
00338 catch (...)
00339 {
00340 }
00341 }
00342
00344 PopenStreams::PopenStreams()
00345 : m_impl(new PopenStreamsImpl)
00346 {
00347 }
00349 PopenStreams::~PopenStreams()
00350 {
00351 }
00353 UnnamedPipeRef PopenStreams::in() const
00354 {
00355 return m_impl->in();
00356 }
00358 void PopenStreams::in(const UnnamedPipeRef& pipe)
00359 {
00360 m_impl->in(pipe);
00361 }
00363 UnnamedPipeRef PopenStreams::out() const
00364 {
00365 return m_impl->out();
00366 }
00368 void PopenStreams::out(const UnnamedPipeRef& pipe)
00369 {
00370 m_impl->out(pipe);
00371 }
00373 UnnamedPipeRef PopenStreams::err() const
00374 {
00375 return m_impl->err();
00376 }
00378 void PopenStreams::err(const UnnamedPipeRef& pipe)
00379 {
00380 m_impl->err(pipe);
00381 }
00383 Array<UnnamedPipeRef> PopenStreams::extraPipes() const
00384 {
00385 return m_impl->extraPipes();
00386 }
00388 void PopenStreams::setExtraPipes(const Array<UnnamedPipeRef>& pipes)
00389 {
00390 m_impl->setExtraPipes(pipes);
00391 }
00393 pid_t PopenStreams::pid() const
00394 {
00395 return m_impl->pid();
00396 }
00398 void PopenStreams::pid(pid_t newPid)
00399 {
00400 m_impl->pid(newPid);
00401 }
00403 int PopenStreams::getExitStatus()
00404 {
00405 return m_impl->getExitStatus();
00406 }
00408 int PopenStreams::getExitStatus(UInt32 wait0, UInt32 wait1, UInt32 wait2)
00409 {
00410 return m_impl->getExitStatus(wait0, wait1, wait2);
00411 }
00413 void PopenStreams::setProcessStatus(int ps)
00414 {
00415 m_impl->setProcessStatus(ps);
00416 }
00418 PopenStreams::PopenStreams(const PopenStreams& src)
00419 : m_impl(src.m_impl)
00420 {
00421 }
00423 PopenStreams& PopenStreams::operator=(const PopenStreams& src)
00424 {
00425 m_impl = src.m_impl;
00426 return *this;
00427 }
00428
00430 bool operator==(const PopenStreams& x, const PopenStreams& y)
00431 {
00432 return x.m_impl == y.m_impl;
00433 }
00434
00436 namespace Exec
00437 {
00438
00440 int
00441 safeSystem(const Array<String>& command, const EnvVars& envVars)
00442 {
00443 return safeSystem(command, envVars.getenvp());
00444 }
00445
00447 int
00448 safeSystem(const Array<String>& command, const char* const envp[])
00449 {
00450 int status;
00451 pid_t pid;
00452 if (command.size() == 0)
00453 {
00454 return 1;
00455 }
00456
00457
00458 AutoPtrVec<const char*> argv(new const char*[command.size() + 1]);
00459 for (size_t i = 0; i < command.size(); i++)
00460 {
00461 argv[i] = command[i].c_str();
00462 }
00463 argv[command.size()] = 0;
00464
00465 pid = ::fork();
00466 if (pid == -1)
00467 {
00468 return -1;
00469 }
00470 if (pid == 0)
00471 {
00472 try
00473 {
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492 sigset_t emptymask;
00493 sigemptyset(&emptymask);
00494 ::sigprocmask(SIG_SETMASK, &emptymask, 0);
00495
00496 for (size_t sig = 1; sig <= NSIG; ++sig)
00497 {
00498 struct sigaction temp;
00499 sigaction(sig, 0, &temp);
00500 temp.sa_handler = SIG_DFL;
00501 sigaction(sig, &temp, NULL);
00502 }
00503
00504
00505 rlimit rl;
00506 int i = sysconf(_SC_OPEN_MAX);
00507 if (getrlimit(RLIMIT_NOFILE, &rl) != -1)
00508 {
00509 if ( i < 0 )
00510 {
00511 i = rl.rlim_max;
00512 }
00513 else
00514 {
00515 i = std::min<int>(rl.rlim_max, i);
00516 }
00517 }
00518 while (i > 2)
00519 {
00520
00521 ::fcntl(i, F_SETFD, FD_CLOEXEC);
00522 i--;
00523 }
00524
00525 int rval;
00526 if (envp)
00527 {
00528 rval = execve(argv[0], const_cast<char* const*>(argv.get()), const_cast<char* const*>(envp));
00529 }
00530 else
00531 {
00532 rval = execv(argv[0], const_cast<char* const*>(argv.get()));
00533 }
00534 cerr << Format( "Exec::safeSystem: execv failed for program "
00535 "%1, rval is %2", argv[0], rval);
00536 }
00537 catch (...)
00538 {
00539 cerr << "something threw an exception after fork()!";
00540 }
00541 _exit(127);
00542 }
00543 do
00544 {
00545 Thread::testCancel();
00546 if (waitpid(pid, &status, 0) == -1)
00547 {
00548 if (errno != EINTR)
00549 {
00550 return -1;
00551 }
00552 }
00553 else
00554 {
00555 return WEXITSTATUS(status);
00556 }
00557 } while (1);
00558 }
00559
00561 PopenStreams
00562 safePopen(const Array<String>& command,
00563 const String& initialInput)
00564 {
00565 PopenStreams retval = safePopen(command);
00566
00567 if (initialInput != "")
00568 {
00569 if (retval.in()->write(initialInput.c_str(), initialInput.length()) == -1)
00570 {
00571 BLOCXX_THROW_ERRNO_MSG(IOException, "Exec::safePopen: Failed writing input to process");
00572 }
00573 }
00574
00575 return retval;
00576 }
00577
00579 PopenStreams
00580 safePopen(const Array<String>& command, const EnvVars& envVars)
00581 {
00582 return safePopen(command, envVars.getenvp());
00583 }
00584
00586 PopenStreams
00587 safePopen(const Array<String>& command, const char* const envp[])
00588 {
00589
00590
00591
00592 const int UNKNOWN_EXCEPTION = -2000;
00593
00594 if (command.size() == 0)
00595 {
00596 BLOCXX_THROW(ExecErrorException, "Exec::safePopen: command is empty");
00597 }
00598
00599 PopenStreams retval;
00600 retval.in( UnnamedPipe::createUnnamedPipe() );
00601 UnnamedPipeRef upipeOut = UnnamedPipe::createUnnamedPipe();
00602 retval.out( upipeOut );
00603 UnnamedPipeRef upipeErr = UnnamedPipe::createUnnamedPipe();
00604 retval.err( upipeErr );
00605
00606 UnnamedPipeRef execErrorPipe = UnnamedPipe::createUnnamedPipe();
00607
00608
00609 AutoPtrVec<const char*> argv(new const char*[command.size() + 1]);
00610 for (size_t i = 0; i < command.size(); i++)
00611 {
00612 argv[i] = command[i].c_str();
00613 }
00614 argv[command.size()] = 0;
00615
00616 pid_t forkrv = ::fork();
00617 if (forkrv == -1)
00618 {
00619 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::safePopen: fork() failed");
00620 }
00621 if (forkrv == 0)
00622 {
00623 int execErrorFd = -1;
00624 try
00625 {
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645 sigset_t emptymask;
00646 sigemptyset(&emptymask);
00647 ::sigprocmask(SIG_SETMASK, &emptymask, 0);
00648
00649 for (size_t sig = 1; sig <= NSIG; ++sig)
00650 {
00651 struct sigaction temp;
00652 sigaction(sig, 0, &temp);
00653 temp.sa_handler = SIG_DFL;
00654 sigaction(sig, &temp, NULL);
00655 }
00656
00657
00658 close(0);
00659 close(1);
00660 close(2);
00661
00662
00663 UnnamedPipeRef foo1 = retval.in();
00664 PosixUnnamedPipeRef in = foo1.cast_to<PosixUnnamedPipe>();
00665
00666 UnnamedPipeRef foo2 = retval.out();
00667 PosixUnnamedPipeRef out = foo2.cast_to<PosixUnnamedPipe>();
00668
00669 UnnamedPipeRef foo3 = retval.err();
00670 PosixUnnamedPipeRef err = foo3.cast_to<PosixUnnamedPipe>();
00671
00672
00673 BLOCXX_ASSERT(in);
00674 BLOCXX_ASSERT(out);
00675 BLOCXX_ASSERT(err);
00676
00677 int rv = dup2(in->getInputHandle(), 0);
00678 BLOCXX_ASSERT(rv != -1);
00679 rv = dup2(out->getOutputHandle(), 1);
00680 BLOCXX_ASSERT(rv != -1);
00681 rv = dup2(err->getOutputHandle(), 2);
00682 BLOCXX_ASSERT(rv != -1);
00683
00684
00685 PosixUnnamedPipeRef execError = execErrorPipe.cast_to<PosixUnnamedPipe>();
00686 BLOCXX_ASSERT(execError);
00687 execErrorFd = execError->getOutputHandle();
00688
00689
00690
00691 rlimit rl;
00692 int i = sysconf(_SC_OPEN_MAX);
00693 if (getrlimit(RLIMIT_NOFILE, &rl) != -1)
00694 {
00695 if ( i < 0 )
00696 {
00697 i = rl.rlim_max;
00698 }
00699 else
00700 {
00701 i = std::min<int>(rl.rlim_max, i);
00702 }
00703 }
00704 while (i > 2)
00705 {
00706
00707 ::fcntl(i, F_SETFD, FD_CLOEXEC);
00708 i--;
00709 }
00710
00711 int rval = 0;
00712 if (envp)
00713 {
00714 rval = execve(argv[0], const_cast<char* const*>(argv.get()), const_cast<char* const*>(envp));
00715 }
00716 else
00717 {
00718 rval = execv(argv[0], const_cast<char* const*>(argv.get()));
00719 }
00720
00721 int lerrno = errno;
00722 write(execErrorFd, &lerrno, sizeof(lerrno));
00723 }
00724 catch (...)
00725 {
00726 int errorVal = UNKNOWN_EXCEPTION;
00727 write(execErrorFd, &errorVal, sizeof(errorVal));
00728 }
00729 _exit(127);
00730 }
00731
00732
00733 retval.pid (forkrv);
00734
00735
00736 UnnamedPipeRef foo1 = retval.in();
00737 PosixUnnamedPipeRef in = foo1.cast_to<PosixUnnamedPipe>();
00738 UnnamedPipeRef foo2 = retval.out();
00739 PosixUnnamedPipeRef out = foo2.cast_to<PosixUnnamedPipe>();
00740 UnnamedPipeRef foo3 = retval.err();
00741 PosixUnnamedPipeRef err = foo3.cast_to<PosixUnnamedPipe>();
00742 BLOCXX_ASSERT(in);
00743 BLOCXX_ASSERT(out);
00744 BLOCXX_ASSERT(err);
00745
00746 in->closeInputHandle();
00747 out->closeOutputHandle();
00748 err->closeOutputHandle();
00749
00750 PosixUnnamedPipeRef execErrorPosixPipe = execErrorPipe.cast_to<PosixUnnamedPipe>();
00751 BLOCXX_ASSERT(execErrorPosixPipe);
00752
00753 execErrorPosixPipe->closeOutputHandle();
00754
00755 const int SECONDS_TO_WAIT_FOR_CHILD_TO_EXEC = 10;
00756 execErrorPipe->setReadTimeout(SECONDS_TO_WAIT_FOR_CHILD_TO_EXEC);
00757
00758 int childErrorCode = 0;
00759 int bytesRead = execErrorPipe->read(&childErrorCode, sizeof(childErrorCode));
00760
00761 if (bytesRead == ETIMEDOUT)
00762 {
00763
00764
00765 kill(forkrv, SIGKILL);
00766 BLOCXX_THROW(ExecErrorException, "Exec::safePopen: timed out waiting for child process to exec()");
00767 }
00768 if (bytesRead > 0)
00769 {
00770
00771 if (childErrorCode == UNKNOWN_EXCEPTION)
00772 {
00773 BLOCXX_THROW(ExecErrorException, "Exec::safePopen: child process caught an exception before reaching exec()");
00774 }
00775 else
00776 {
00777 errno = childErrorCode;
00778 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, Format("Exec::safePopen: child process failed running exec() process = %1", command[0]));
00779 }
00780 }
00781
00782 return retval;
00783 }
00784
00785 namespace
00786 {
00787
00788 #ifndef BLOCXX_MIN
00789 #define BLOCXX_MIN(x, y) (x) < (y) ? (x) : (y)
00790 #endif
00791
00793 class StringOutputGatherer : public OutputCallback
00794 {
00795 public:
00796 StringOutputGatherer(String& output, int outputLimit)
00797 : m_stdout(output)
00798 , m_stderr(output)
00799 , m_outputLimit(outputLimit)
00800 {
00801 }
00802 StringOutputGatherer(String& stdOutput, String& errOutput, int outputLimit)
00803 : m_stdout(stdOutput)
00804 , m_stderr(errOutput)
00805 , m_outputLimit(outputLimit)
00806 {
00807 }
00808 private:
00809 virtual void doHandleData(const char* data, size_t dataLen, EOutputSource outputSource, PopenStreams& theStream, size_t streamIndex, Array<char>& inputBuffer)
00810 {
00811 String& m_output(outputSource == E_STDOUT ? m_stdout : m_stderr);
00812
00813 if (m_outputLimit >= 0 && m_output.length() + dataLen > static_cast<size_t>(m_outputLimit))
00814 {
00815
00816 int lentocopy = BLOCXX_MIN(m_outputLimit - m_output.length(), dataLen);
00817 if (lentocopy >= 0)
00818 {
00819 m_output += String(data, lentocopy);
00820 }
00821 BLOCXX_THROW(ExecBufferFullException, "Exec::StringOutputGatherer::doHandleData(): buffer full");
00822 }
00823
00824 m_output += data;
00825 }
00826 String& m_stdout;
00827 String& m_stderr;
00828 int m_outputLimit;
00829 };
00830
00832 class SingleStringInputCallback : public InputCallback
00833 {
00834 public:
00835 SingleStringInputCallback(const String& s)
00836 : m_s(s)
00837 {
00838 }
00839 private:
00840 virtual void doGetData(Array<char>& inputBuffer, PopenStreams& theStream, size_t streamIndex)
00841 {
00842 if (m_s.length() > 0)
00843 {
00844 inputBuffer.insert(inputBuffer.end(), m_s.c_str(), m_s.c_str() + m_s.length());
00845 m_s.erase();
00846 }
00847 else if (theStream.in()->isOpen())
00848 {
00849 theStream.in()->close();
00850 }
00851 }
00852 String m_s;
00853 };
00854
00855 }
00856
00858 void
00859 executeProcessAndGatherOutput(const Array<String>& command,
00860 String& output, int& processStatus,
00861 int timeoutSecs, int outputLimit, const String& input)
00862 {
00863 executeProcessAndGatherOutput(command, output, processStatus,
00864 EnvVars(EnvVars::E_CURRENT_ENVIRONMENT),
00865 timeoutSecs, outputLimit, input);
00866 }
00867
00869 void executeProcessAndGatherOutput(
00870 const Array<String>& command,
00871 String& output,
00872 int& processStatus,
00873 const EnvVars& envVars,
00874 int timeoutSecs,
00875 int outputLimit,
00876 const String& input)
00877 {
00878 processStatus = -1;
00879 Array<PopenStreams> streams;
00880 streams.push_back(safePopen(command, envVars));
00881 Array<ProcessStatus> processStatuses(1);
00882 SingleStringInputCallback singleStringInputCallback(input);
00883
00884 StringOutputGatherer gatherer(output, outputLimit);
00885 processInputOutput(gatherer, streams, processStatuses,
00886 singleStringInputCallback, timeoutSecs);
00887
00888 if (processStatuses[0].hasExited())
00889 {
00890 processStatus = processStatuses[0].getStatus();
00891 }
00892 else
00893 {
00894 processStatus = streams[0].getExitStatus();
00895 }
00896 }
00897
00899 void executeProcessAndGatherOutput(
00900 const Array<String>& command,
00901 String& stdOutput,
00902 String& errOutput,
00903 int& processStatus,
00904 const EnvVars& envVars,
00905 int timeoutSecs,
00906 int outputLimit,
00907 const String& input)
00908 {
00909 processStatus = -1;
00910 Array<PopenStreams> streams;
00911 streams.push_back(safePopen(command, envVars));
00912 Array<ProcessStatus> processStatuses(1);
00913 SingleStringInputCallback singleStringInputCallback(input);
00914
00915 StringOutputGatherer gatherer(stdOutput, errOutput, outputLimit);
00916 processInputOutput(gatherer, streams, processStatuses,
00917 singleStringInputCallback, timeoutSecs);
00918
00919 if (processStatuses[0].hasExited())
00920 {
00921 processStatus = processStatuses[0].getStatus();
00922 }
00923 else
00924 {
00925 processStatus = streams[0].getExitStatus();
00926 }
00927 }
00928
00930 void
00931 gatherOutput(String& output, PopenStreams& stream, int& processStatus, int timeoutSecs, int outputLimit)
00932 {
00933 Array<PopenStreams> streams;
00934 streams.push_back(stream);
00935 Array<ProcessStatus> processStatuses(1);
00936
00937 StringOutputGatherer gatherer(output, outputLimit);
00938 SingleStringInputCallback singleStringInputCallback = SingleStringInputCallback(String());
00939 processInputOutput(gatherer, streams, processStatuses, singleStringInputCallback, timeoutSecs);
00940 if (processStatuses[0].hasExited())
00941 {
00942 processStatus = processStatuses[0].getStatus();
00943 }
00944 }
00945
00947 OutputCallback::~OutputCallback()
00948 {
00949
00950 }
00951
00953 void
00954 OutputCallback::handleData(const char* data, size_t dataLen, EOutputSource outputSource, PopenStreams& theStream, size_t streamIndex, Array<char>& inputBuffer)
00955 {
00956 doHandleData(data, dataLen, outputSource, theStream, streamIndex, inputBuffer);
00957 }
00958
00960 InputCallback::~InputCallback()
00961 {
00962 }
00963
00965 void
00966 InputCallback::getData(Array<char>& inputBuffer, PopenStreams& theStream, size_t streamIndex)
00967 {
00968 doGetData(inputBuffer, theStream, streamIndex);
00969 }
00970
00971 namespace
00972 {
00973 struct ProcessOutputState
00974 {
00975 bool inIsOpen;
00976 bool outIsOpen;
00977 bool errIsOpen;
00978 size_t availableDataLen;
00979
00980 ProcessOutputState()
00981 : inIsOpen(true)
00982 , outIsOpen(true)
00983 , errIsOpen(true)
00984 , availableDataLen(0)
00985 {
00986 }
00987 };
00988
00989 }
00990
00992 void
00993 processInputOutput(OutputCallback& output, Array<PopenStreams>& streams, Array<ProcessStatus>& processStatuses, InputCallback& input, int timeoutsecs)
00994 {
00995 processStatuses.clear();
00996 processStatuses.resize(streams.size());
00997
00998 Array<ProcessOutputState> processStates(streams.size());
00999 int numOpenPipes(streams.size() * 2);
01000
01001 DateTime curTime;
01002 curTime.setToCurrent();
01003 DateTime timeoutEnd(curTime);
01004 timeoutEnd += timeoutsecs;
01005
01006 Array<Array<char> > inputs(processStates.size());
01007 for (size_t i = 0; i < processStates.size(); ++i)
01008 {
01009 input.getData(inputs[i], streams[i], i);
01010 processStates[i].availableDataLen = inputs[i].size();
01011 if (!streams[i].out()->isOpen())
01012 {
01013 processStates[i].outIsOpen = false;
01014 }
01015 if (!streams[i].err()->isOpen())
01016 {
01017 processStates[i].errIsOpen = false;
01018 }
01019 if (!streams[i].in()->isOpen())
01020 {
01021 processStates[i].inIsOpen = false;
01022 }
01023
01024 }
01025
01026 while (numOpenPipes > 0)
01027 {
01028 Select::SelectObjectArray selObjs;
01029 std::map<int, int> inputIndexProcessIndex;
01030 std::map<int, int> outputIndexProcessIndex;
01031 for (size_t i = 0; i < streams.size(); ++i)
01032 {
01033 if (processStates[i].outIsOpen)
01034 {
01035 Select::SelectObject selObj(streams[i].out()->getSelectObj());
01036 selObj.waitForRead = true;
01037 selObjs.push_back(selObj);
01038 inputIndexProcessIndex[selObjs.size() - 1] = i;
01039 }
01040 if (processStates[i].errIsOpen)
01041 {
01042 Select::SelectObject selObj(streams[i].err()->getSelectObj());
01043 selObj.waitForRead = true;
01044 selObjs.push_back(selObj);
01045 inputIndexProcessIndex[selObjs.size() - 1] = i;
01046 }
01047 if (processStates[i].inIsOpen && processStates[i].availableDataLen > 0)
01048 {
01049 Select::SelectObject selObj(streams[i].in()->getWriteSelectObj());
01050 selObj.waitForWrite = true;
01051 selObjs.push_back(selObj);
01052 outputIndexProcessIndex[selObjs.size() - 1] = i;
01053 }
01054
01055
01056 if (streams[i].pid() != -1)
01057 {
01058 pid_t waitpidrv;
01059 int processStatus(-1);
01060 waitpidrv = noIntrWaitPid(streams[i].pid(), &processStatus, WNOHANG);
01061 if (waitpidrv == -1)
01062 {
01063 streams[i].pid(-1);
01064 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: waitpid() failed");
01065 }
01066 else if (waitpidrv != 0)
01067 {
01068 streams[i].pid(-1);
01069 streams[i].setProcessStatus(processStatus);
01070 processStatuses[i] = ProcessStatus(processStatus);
01071 }
01072 }
01073 }
01074
01075 const int mstimeout = 100;
01076 int selectrval = Select::selectRW(selObjs, mstimeout);
01077 switch (selectrval)
01078 {
01079 case Select::SELECT_INTERRUPTED:
01080
01081 break;
01082 case Select::SELECT_ERROR:
01083 {
01084 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: error selecting on stdout and stderr");
01085 }
01086 break;
01087 case Select::SELECT_TIMEOUT:
01088 {
01089
01090
01091 for (size_t i = 0; i < streams.size(); ++i)
01092 {
01093 if (streams[i].pid() == -1)
01094 {
01095 if (processStates[i].inIsOpen)
01096 {
01097 processStates[i].inIsOpen = false;
01098 streams[i].in()->close();
01099 }
01100 if (processStates[i].outIsOpen)
01101 {
01102 processStates[i].outIsOpen = false;
01103 streams[i].out()->close();
01104 --numOpenPipes;
01105 }
01106 if (processStates[i].errIsOpen)
01107 {
01108 processStates[i].errIsOpen = false;
01109 streams[i].err()->close();
01110 --numOpenPipes;
01111 }
01112 }
01113 }
01114
01115 curTime.setToCurrent();
01116 if (timeoutsecs >= 0 && curTime > timeoutEnd)
01117 {
01118 BLOCXX_THROW(ExecTimeoutException, "Exec::gatherOutput: timedout");
01119 }
01120 }
01121 break;
01122 default:
01123 {
01124 int availableToFind = selectrval;
01125
01126 curTime.setToCurrent();
01127 timeoutEnd = curTime;
01128 timeoutEnd += timeoutsecs;
01129
01130 for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
01131 {
01132 if (!selObjs[i].readAvailable)
01133 {
01134 continue;
01135 }
01136 else
01137 {
01138 --availableToFind;
01139 }
01140 int streamIndex = inputIndexProcessIndex[i];
01141 UnnamedPipeRef readstream;
01142 if (processStates[streamIndex].outIsOpen)
01143 {
01144 if (streams[streamIndex].out()->getSelectObj() == selObjs[i].s)
01145 {
01146 readstream = streams[streamIndex].out();
01147 }
01148 }
01149
01150 if (!readstream && processStates[streamIndex].errIsOpen)
01151 {
01152 if (streams[streamIndex].err()->getSelectObj() == selObjs[i].s)
01153 {
01154 readstream = streams[streamIndex].err();
01155 }
01156 }
01157
01158 if (!readstream)
01159 {
01160 continue;
01161 }
01162
01163 char buff[1024];
01164 int readrc = readstream->read(buff, sizeof(buff) - 1);
01165 if (readrc == 0)
01166 {
01167 if (readstream == streams[streamIndex].out())
01168 {
01169 processStates[streamIndex].outIsOpen = false;
01170 streams[streamIndex].out()->close();
01171 }
01172 else
01173 {
01174 processStates[streamIndex].errIsOpen = false;
01175 streams[streamIndex].err()->close();
01176 }
01177 --numOpenPipes;
01178 }
01179 else if (readrc == -1)
01180 {
01181 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: read error");
01182 }
01183 else
01184 {
01185 buff[readrc] = '\0';
01186 output.handleData(buff, readrc, readstream == streams[streamIndex].out() ? E_STDOUT : E_STDERR, streams[streamIndex],
01187 streamIndex, inputs[streamIndex]);
01188 }
01189 }
01190
01191
01192 for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
01193 {
01194 if (!selObjs[i].writeAvailable)
01195 {
01196 continue;
01197 }
01198 else
01199 {
01200 --availableToFind;
01201 }
01202 int streamIndex = outputIndexProcessIndex[i];
01203 UnnamedPipeRef writestream;
01204 if (processStates[streamIndex].inIsOpen)
01205 {
01206 writestream = streams[streamIndex].in();
01207 }
01208
01209 if (!writestream)
01210 {
01211 continue;
01212 }
01213
01214 size_t offset = inputs[streamIndex].size() - processStates[streamIndex].availableDataLen;
01215 int writerc = writestream->write(&inputs[streamIndex][offset], processStates[streamIndex].availableDataLen);
01216 if (writerc == 0)
01217 {
01218 processStates[streamIndex].inIsOpen = false;
01219 streams[streamIndex].in()->close();
01220 }
01221 else if (writerc == -1)
01222 {
01223 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: write error");
01224 }
01225 else
01226 {
01227 inputs[streamIndex].erase(inputs[streamIndex].begin(), inputs[streamIndex].begin() + writerc);
01228 input.getData(inputs[streamIndex], streams[streamIndex], streamIndex);
01229 processStates[streamIndex].availableDataLen = inputs[streamIndex].size();
01230 }
01231 }
01232 }
01233 break;
01234 }
01235 }
01236 }
01237
01238 }
01239 #endif
01240 }
01241