Exec.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2004 Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2005 Novell, Inc. All rights reserved.
00004 *
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 *
00008 *  - Redistributions of source code must retain the above copyright notice,
00009 *    this list of conditions and the following disclaimer.
00010 *
00011 *  - Redistributions in binary form must reproduce the above copyright notice,
00012 *    this list of conditions and the following disclaimer in the documentation
00013 *    and/or other materials provided with the distribution.
00014 *
00015 *  - Neither the name of Vintela, Inc., Novell, Inc., nor the names of its
00016 *    contributors may be used to endorse or promote products derived from this
00017 *    software without specific prior written permission.
00018 *
00019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00020 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00021 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00022 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc., Novell, Inc., OR THE 
00023 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
00024 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
00025 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 
00026 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
00027 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
00028 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
00029 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 *******************************************************************************/
00031 
00036 #include "blocxx/BLOCXX_config.h"
00037 #include "blocxx/Exec.hpp"
00038 #include "blocxx/Format.hpp"
00039 #include "blocxx/Assertion.hpp"
00040 #include "blocxx/PosixUnnamedPipe.hpp"
00041 #include "blocxx/Array.hpp"
00042 #include "blocxx/IOException.hpp"
00043 #include "blocxx/Thread.hpp"
00044 #include "blocxx/Select.hpp"
00045 #include "blocxx/ExceptionIds.hpp"
00046 #include "blocxx/IntrusiveCountableBase.hpp"
00047 #include "blocxx/DateTime.hpp"
00048 #include "blocxx/AutoPtr.hpp"
00049 
00050 #include <map>
00051 
00052 extern "C"
00053 {
00054 #ifdef BLOCXX_HAVE_SYS_RESOURCE_H
00055 #include <sys/resource.h>
00056 #endif
00057 #ifndef BLOCXX_WIN32
00058 #include <unistd.h>
00059 #include <sys/wait.h>
00060 #include <fcntl.h>
00061 #endif
00062 #include <errno.h>
00063 #include <stdio.h> // for perror
00064 #include <signal.h>
00065 }
00066 
00067 #include <cerrno>
00068 #include <iostream>  // for cerr
00069 
00070 // NSIG may be defined by signal.h, otherwise 64 should be plenty.
00071 #ifndef NSIG
00072 #define NSIG 64
00073 #endif
00074 
00075 namespace BLOCXX_NAMESPACE
00076 {
00077 
00078 using std::cerr;
00079 using std::endl;
00080 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ExecTimeout);
00081 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ExecBufferFull);
00082 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ExecError);
00083 
00084 #ifndef BLOCXX_WIN32
00085 class PopenStreamsImpl : public IntrusiveCountableBase
00086 {
00087 public:
00088    PopenStreamsImpl();
00089    ~PopenStreamsImpl();
00090    UnnamedPipeRef in() const;
00091    void in(const UnnamedPipeRef& pipe);
00092    UnnamedPipeRef out() const;
00093    void out(const UnnamedPipeRef& pipe);
00094    UnnamedPipeRef err() const;
00095    void err(const UnnamedPipeRef& pipe);
00096    Array<UnnamedPipeRef> extraPipes() const;
00097    void setExtraPipes(const Array<UnnamedPipeRef>& pipes);
00098 
00099    pid_t pid();
00100    void pid(pid_t newPid);
00101    int getExitStatus();
00102    int getExitStatus(UInt32 wait_initial, UInt32 wait_close, UInt32 wait_term);
00103    void setProcessStatus(int ps)
00104    {
00105       m_processstatus = ps;
00106    }
00107 private:
00108    UnnamedPipeRef m_in;
00109    UnnamedPipeRef m_out;
00110    UnnamedPipeRef m_err;
00111    Array<UnnamedPipeRef> m_extraPipes;
00112    pid_t m_pid;
00113    int m_processstatus;
00114 };
00116 PopenStreamsImpl::PopenStreamsImpl()
00117    : m_pid(-1)
00118    , m_processstatus(-1)
00119 {
00120 }
00122 UnnamedPipeRef PopenStreamsImpl::in() const
00123 {
00124    return m_in;
00125 }
00127 void PopenStreamsImpl::in(const UnnamedPipeRef& pipe)
00128 {
00129    m_in = pipe;
00130 }
00132 UnnamedPipeRef PopenStreamsImpl::out() const
00133 {
00134    return m_out;
00135 }
00137 void PopenStreamsImpl::out(const UnnamedPipeRef& pipe)
00138 {
00139    m_out = pipe;
00140 }
00142 UnnamedPipeRef PopenStreamsImpl::err() const
00143 {
00144    return m_err;
00145 }
00147 void PopenStreamsImpl::err(const UnnamedPipeRef& pipe)
00148 {
00149    m_err = pipe;
00150 }
00152 Array<UnnamedPipeRef> PopenStreamsImpl::extraPipes() const
00153 {
00154    return m_extraPipes;
00155 }
00157 void PopenStreamsImpl::setExtraPipes(const Array<UnnamedPipeRef>& pipes)
00158 {
00159    m_extraPipes = pipes;
00160 }
00162 pid_t PopenStreamsImpl::pid()
00163 {
00164    return m_pid;
00165 }
00167 void PopenStreamsImpl::pid(pid_t newPid)
00168 {
00169    m_pid = newPid;
00170 }
00172 static inline ProcId safeWaitPid(ProcId pid, int* status, int options)
00173 {
00174    // The status is not passed directly to waitpid because some implementations
00175    // store a value there even when the function returns <= 0.
00176    int localReturnValue = -1;
00177    pid_t returnedPID = ::waitpid(pid, &localReturnValue, options);
00178    if( returnedPID > 0 )
00179    {
00180       *status = localReturnValue;
00181    }  
00182    return returnedPID;
00183 }
00184 
00186 static ProcId noIntrWaitPid(ProcId pid, int* status, int options)
00187 {
00188    pid_t waitpidrv;
00189    do
00190    {
00191       Thread::testCancel();
00192       waitpidrv = safeWaitPid(pid, status, options);
00193    } while (waitpidrv == -1 && errno == EINTR);
00194    return waitpidrv;
00195 }
00196 
00198 static inline void
00199 milliSleep(UInt32 milliSeconds)
00200 {
00201    Thread::sleep(milliSeconds);
00202 }
00204 static inline void
00205 secSleep(UInt32 seconds)
00206 {
00207    Thread::sleep(seconds * 1000);
00208 }
00210 static bool
00211 timedWaitPid(ProcId pid, int * pstatus, UInt32 wait_time)
00212 {
00213    UInt32 const N = 154;
00214    UInt32 const M = 128;  // N/M is about 1.20
00215    UInt32 const MAXPERIOD = 5000;
00216    UInt32 period = 100;
00217    UInt32 t = 0;
00218    ProcId waitpidrv = noIntrWaitPid(pid, pstatus, WNOHANG);
00219    while (t < wait_time && waitpidrv == 0) {
00220       milliSleep(period);
00221       t += period;
00222       period *= N;
00223       period /= M; 
00224       if (period > MAXPERIOD)
00225       {
00226          period = MAXPERIOD;
00227       }
00228       waitpidrv = noIntrWaitPid(pid, pstatus, WNOHANG);
00229    }
00230    if (waitpidrv < 0) {
00231       BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "waitpid() failed.");
00232    }
00233    return waitpidrv != 0;
00234 }
00235 
00237 // Send signal sig to the process, then wait at most wait_time milliseconds.
00238 // for the process to terminate.  Return true if termination detected.
00239 //
00240 static bool killWait(
00241    ProcId pid, int * pstatus, UInt32 wait_time, int sig, char const * signame
00242 )
00243 {
00244    if (::kill(pid, sig) == -1) {
00245       // don't trust waitpid, Format ctor, etc. to leave errno alone
00246       int errnum = errno;
00247       // maybe kill() failed because child terminated first
00248       if (noIntrWaitPid(pid, pstatus, WNOHANG) > 0) {
00249          return true;
00250       }
00251       else {
00252          Format fmt("Failed sending %1 to process %2.", signame, pid);
00253          char const * msg = fmt.c_str();
00254          errno = errnum;
00255          BLOCXX_THROW_ERRNO_MSG(ExecErrorException, msg);
00256       }
00257    }
00258    return timedWaitPid(pid, pstatus, wait_time);
00259 }
00260 
00262 int PopenStreamsImpl::getExitStatus()
00263 {
00264    return this->getExitStatus(0, 10 *1000, 10 * 1000);
00265 }
00266 
00268 int PopenStreamsImpl::getExitStatus(
00269    UInt32 wait_initial, UInt32 wait_close, UInt32 wait_term)
00270 {
00271    if (m_pid < 0)
00272    {
00273       return m_processstatus;
00274    }
00275    if (m_pid == ::getpid())
00276    {
00277       BLOCXX_THROW(ExecErrorException, "PopenStreamsImpl::getExitStatus: m_pid == getpid()");
00278    }
00279 
00280    ProcId pid = m_pid;
00281    m_pid = -1;
00282    int * pstatus = &m_processstatus;
00283 
00284    // Convert times to milliseconds
00285    wait_initial *= 1000;
00286    wait_close *= 1000;
00287    wait_term *= 1000;
00288 
00289    if (wait_initial > 0 && timedWaitPid(pid, pstatus, wait_initial))
00290    {
00291       return m_processstatus;
00292    }
00293 
00294    if (wait_close > 0)
00295    {
00296       // Close the streams. If the child process is blocked waiting to output,
00297       // then this will cause it to get a SIGPIPE, and it may be able to clean
00298       // up after itself.  Likewise, if the child process is blocked waiting
00299       // for input, it will now detect EOF.
00300       UnnamedPipeRef upr;
00301       if (upr = in())
00302       {
00303          upr->close();
00304       }
00305       if (upr = out())
00306       {
00307          upr->close();
00308       }
00309       if (upr = err())
00310       {
00311          upr->close();
00312       }
00313       if (timedWaitPid(pid, pstatus, wait_close))
00314       {
00315          return m_processstatus;
00316       }
00317    }
00318 
00319    if (wait_term > 0 && killWait(pid, pstatus, wait_term, SIGTERM, "SIGTERM"))
00320    {
00321       return m_processstatus;
00322    }
00323    if (!killWait(pid, pstatus, 5000, SIGKILL, "SIGKILL")) {
00324       BLOCXX_THROW(
00325          ExecErrorException, "PopenStreamsImpl::getExitStatus: Child process has not exited after sending it a SIGKILL."
00326       );
00327    }
00328    return m_processstatus;
00329 }
00331 PopenStreamsImpl::~PopenStreamsImpl()
00332 {
00333    try // can't let exceptions past.
00334    {
00335       // This will terminate the process.
00336       getExitStatus();
00337    }
00338    catch (...)
00339    {
00340    }
00341 }
00342 
00344 PopenStreams::PopenStreams()
00345    : m_impl(new PopenStreamsImpl)
00346 {
00347 }
00349 PopenStreams::~PopenStreams()
00350 {
00351 }
00353 UnnamedPipeRef PopenStreams::in() const
00354 {
00355    return m_impl->in();
00356 }
00358 void PopenStreams::in(const UnnamedPipeRef& pipe)
00359 {
00360    m_impl->in(pipe);
00361 }
00363 UnnamedPipeRef PopenStreams::out() const
00364 {
00365    return m_impl->out();
00366 }
00368 void PopenStreams::out(const UnnamedPipeRef& pipe)
00369 {
00370    m_impl->out(pipe);
00371 }
00373 UnnamedPipeRef PopenStreams::err() const
00374 {
00375    return m_impl->err();
00376 }
00378 void PopenStreams::err(const UnnamedPipeRef& pipe)
00379 {
00380    m_impl->err(pipe);
00381 }
00383 Array<UnnamedPipeRef> PopenStreams::extraPipes() const
00384 {
00385    return m_impl->extraPipes();
00386 }
00388 void PopenStreams::setExtraPipes(const Array<UnnamedPipeRef>& pipes)
00389 {
00390    m_impl->setExtraPipes(pipes);
00391 }
00393 pid_t PopenStreams::pid() const
00394 {
00395    return m_impl->pid();
00396 }
00398 void PopenStreams::pid(pid_t newPid)
00399 {
00400    m_impl->pid(newPid);
00401 }
00403 int PopenStreams::getExitStatus()
00404 {
00405    return m_impl->getExitStatus();
00406 }
00408 int PopenStreams::getExitStatus(UInt32 wait0, UInt32 wait1, UInt32 wait2)
00409 {
00410    return m_impl->getExitStatus(wait0, wait1, wait2);
00411 }
00413 void PopenStreams::setProcessStatus(int ps)
00414 {
00415    m_impl->setProcessStatus(ps);
00416 }
00418 PopenStreams::PopenStreams(const PopenStreams& src)
00419    : m_impl(src.m_impl)
00420 {
00421 }
00423 PopenStreams& PopenStreams::operator=(const PopenStreams& src)
00424 {
00425    m_impl = src.m_impl;
00426    return *this;
00427 }
00428 
00430 bool operator==(const PopenStreams& x, const PopenStreams& y)
00431 {
00432    return x.m_impl == y.m_impl;
00433 }
00434 
00436 namespace Exec
00437 {
00438 
00440 int 
00441 safeSystem(const Array<String>& command, const EnvVars& envVars)
00442 {
00443    return safeSystem(command, envVars.getenvp());
00444 }
00445 
00447 int
00448 safeSystem(const Array<String>& command, const char* const envp[])
00449 {
00450    int status;
00451    pid_t pid;
00452    if (command.size() == 0)
00453    {
00454       return 1;
00455    }
00456 
00457    // This has to be done before fork().  In a multi-threaded app, calling new after fork() can cause a deadlock.
00458    AutoPtrVec<const char*> argv(new const char*[command.size() + 1]);
00459    for (size_t i = 0; i < command.size(); i++)
00460    {
00461       argv[i] = command[i].c_str();
00462    }
00463    argv[command.size()] = 0;
00464 
00465    pid = ::fork();
00466    if (pid == -1)
00467    {
00468       return -1;
00469    }
00470    if (pid == 0)
00471    {
00472       try
00473       {
00474 
00475          // according to susv3:
00476          //        This  volume  of  IEEE Std 1003.1-2001  specifies  that  signals set to
00477          //        SIG_IGN remain set to SIG_IGN, and that  the  process  signal  mask  be
00478          //        unchanged  across an exec. This is consistent with historical implemen-
00479          //        tations, and it permits some useful functionality, such  as  the  nohup
00480          //        command.  However,  it  should be noted that many existing applications
00481          //        wrongly assume that they start with certain signals set to the  default
00482          //        action  and/or  unblocked.  In  particular, applications written with a
00483          //        simpler signal model that does not include blocking of signals, such as
00484          //        the  one in the ISO C standard, may not behave properly if invoked with
00485          //        some signals blocked. Therefore, it is best not to block or ignore sig-
00486          //        nals  across execs without explicit reason to do so, and especially not
00487          //        to block signals across execs of arbitrary (not  closely  co-operating)
00488          //        programs.
00489 
00490          // so we'll reset the signal mask and all signal handlers to SIG_DFL. We set them all
00491          // just in case the current handlers may misbehave now that we've fork()ed.
00492          sigset_t emptymask;
00493          sigemptyset(&emptymask);
00494          ::sigprocmask(SIG_SETMASK, &emptymask, 0);
00495 
00496          for (size_t sig = 1; sig <= NSIG; ++sig)
00497          {
00498             struct sigaction temp;
00499             sigaction(sig, 0, &temp);
00500             temp.sa_handler = SIG_DFL;
00501             sigaction(sig, &temp, NULL);
00502          }
00503 
00504          // Close all file handle from parent process
00505          rlimit rl;
00506          int i = sysconf(_SC_OPEN_MAX);
00507          if (getrlimit(RLIMIT_NOFILE, &rl) != -1)
00508          {
00509             if ( i < 0 )
00510             {
00511                i = rl.rlim_max;
00512             }
00513             else
00514             {
00515                i = std::min<int>(rl.rlim_max, i);
00516             }
00517          }
00518          while (i > 2)
00519          {
00520             // set it for close on exec
00521             ::fcntl(i, F_SETFD, FD_CLOEXEC);
00522             i--;
00523          }
00524 
00525          int rval; 
00526          if (envp)
00527          {
00528             rval = execve(argv[0], const_cast<char* const*>(argv.get()), const_cast<char* const*>(envp));
00529          }
00530          else
00531          {
00532             rval = execv(argv[0], const_cast<char* const*>(argv.get()));
00533          }
00534          cerr << Format( "Exec::safeSystem: execv failed for program "
00535                "%1, rval is %2", argv[0], rval);
00536       }
00537       catch (...)
00538       {
00539          cerr << "something threw an exception after fork()!";
00540       }
00541       _exit(127);
00542    }
00543    do
00544    {
00545       Thread::testCancel();
00546       if (waitpid(pid, &status, 0) == -1)
00547       {
00548          if (errno != EINTR)
00549          {
00550             return -1;
00551          }
00552       }
00553       else
00554       {
00555          return WEXITSTATUS(status);
00556       }
00557    } while (1);
00558 }
00559 
00561 PopenStreams
00562 safePopen(const Array<String>& command,
00563       const String& initialInput)
00564 {
00565    PopenStreams retval = safePopen(command);
00566 
00567    if (initialInput != "")
00568    {
00569       if (retval.in()->write(initialInput.c_str(), initialInput.length()) == -1)
00570       {
00571          BLOCXX_THROW_ERRNO_MSG(IOException, "Exec::safePopen: Failed writing input to process");
00572       }
00573    }
00574 
00575    return retval;
00576 }
00577 
00579 PopenStreams 
00580 safePopen(const Array<String>& command, const EnvVars& envVars)
00581 {
00582    return safePopen(command, envVars.getenvp());
00583 }
00584 
00586 PopenStreams
00587 safePopen(const Array<String>& command, const char* const envp[])
00588 {
00589    // sent over the execErrorPipe if an exception is caught after fork()ing.
00590    // Negative because errno values are positive. Maybe this is a bad assumption? 
00591    // The worst that could happen is reporting an unknown exception instead of the real errno value.
00592    const int UNKNOWN_EXCEPTION = -2000; 
00593 
00594    if (command.size() == 0)
00595    {
00596       BLOCXX_THROW(ExecErrorException, "Exec::safePopen: command is empty");
00597    }
00598    
00599    PopenStreams retval;
00600    retval.in( UnnamedPipe::createUnnamedPipe() );
00601    UnnamedPipeRef upipeOut = UnnamedPipe::createUnnamedPipe();
00602    retval.out( upipeOut );
00603    UnnamedPipeRef upipeErr = UnnamedPipe::createUnnamedPipe();
00604    retval.err( upipeErr );
00605 
00606    UnnamedPipeRef execErrorPipe = UnnamedPipe::createUnnamedPipe();
00607 
00608    // This has to be done before fork().  In a multi-threaded app, calling new after fork() can cause a deadlock.
00609    AutoPtrVec<const char*> argv(new const char*[command.size() + 1]);
00610    for (size_t i = 0; i < command.size(); i++)
00611    {
00612       argv[i] = command[i].c_str();
00613    }
00614    argv[command.size()] = 0;
00615 
00616    pid_t forkrv = ::fork();
00617    if (forkrv == -1)
00618    {
00619       BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::safePopen: fork() failed");
00620    }
00621    if (forkrv == 0)
00622    {
00623       int execErrorFd = -1;
00624       try
00625       {
00626 
00627          // child process
00628          // according to susv3:
00629          //        This  volume  of  IEEE Std 1003.1-2001  specifies  that  signals set to
00630          //        SIG_IGN remain set to SIG_IGN, and that  the  process  signal  mask  be
00631          //        unchanged  across an exec. This is consistent with historical implemen-
00632          //        tations, and it permits some useful functionality, such  as  the  nohup
00633          //        command.  However,  it  should be noted that many existing applications
00634          //        wrongly assume that they start with certain signals set to the  default
00635          //        action  and/or  unblocked.  In  particular, applications written with a
00636          //        simpler signal model that does not include blocking of signals, such as
00637          //        the  one in the ISO C standard, may not behave properly if invoked with
00638          //        some signals blocked. Therefore, it is best not to block or ignore sig-
00639          //        nals  across execs without explicit reason to do so, and especially not
00640          //        to block signals across execs of arbitrary (not  closely  co-operating)
00641          //        programs.
00642    
00643          // so we'll reset the signal mask and all signal handlers to SIG_DFL. We set them all
00644          // just in case the current handlers may misbehave now that we've fork()ed.
00645          sigset_t emptymask;
00646          sigemptyset(&emptymask);
00647          ::sigprocmask(SIG_SETMASK, &emptymask, 0);
00648    
00649          for (size_t sig = 1; sig <= NSIG; ++sig)
00650          {
00651             struct sigaction temp;
00652             sigaction(sig, 0, &temp);
00653             temp.sa_handler = SIG_DFL;
00654             sigaction(sig, &temp, NULL);
00655          }
00656    
00657          // Close stdin, stdout, and stderr.
00658          close(0);
00659          close(1);
00660          close(2);
00661 
00662          // this should only fail because of programmer error.
00663          UnnamedPipeRef foo1 = retval.in();
00664          PosixUnnamedPipeRef in = foo1.cast_to<PosixUnnamedPipe>();
00665    
00666          UnnamedPipeRef foo2 = retval.out();
00667          PosixUnnamedPipeRef out = foo2.cast_to<PosixUnnamedPipe>();
00668    
00669          UnnamedPipeRef foo3 = retval.err();
00670          PosixUnnamedPipeRef err = foo3.cast_to<PosixUnnamedPipe>();
00671 
00672          
00673          BLOCXX_ASSERT(in);
00674          BLOCXX_ASSERT(out);
00675          BLOCXX_ASSERT(err);
00676          // connect stdin, stdout, and stderr to the return pipes.
00677          int rv = dup2(in->getInputHandle(), 0);
00678          BLOCXX_ASSERT(rv != -1);
00679          rv = dup2(out->getOutputHandle(), 1);
00680          BLOCXX_ASSERT(rv != -1);
00681          rv = dup2(err->getOutputHandle(), 2);
00682          BLOCXX_ASSERT(rv != -1);
00683 
00684          // set up the execError fd
00685          PosixUnnamedPipeRef execError = execErrorPipe.cast_to<PosixUnnamedPipe>();
00686          BLOCXX_ASSERT(execError);
00687          execErrorFd = execError->getOutputHandle();
00688 
00689 
00690          // Close all other file handle from parent process
00691          rlimit rl;
00692          int i = sysconf(_SC_OPEN_MAX);
00693          if (getrlimit(RLIMIT_NOFILE, &rl) != -1)
00694          {
00695             if ( i < 0 )
00696             {
00697                i = rl.rlim_max;
00698             }
00699             else
00700             {
00701                i = std::min<int>(rl.rlim_max, i);
00702             }
00703          }
00704          while (i > 2)
00705          {
00706             // set it for close on exec
00707             ::fcntl(i, F_SETFD, FD_CLOEXEC);
00708             i--;
00709          }
00710    
00711          int rval = 0;
00712          if (envp)
00713          {
00714             rval = execve(argv[0], const_cast<char* const*>(argv.get()), const_cast<char* const*>(envp));
00715          }
00716          else
00717          {
00718             rval = execv(argv[0], const_cast<char* const*>(argv.get()));
00719          }
00720          // send errno over the pipe
00721          int lerrno = errno;
00722          write(execErrorFd, &lerrno, sizeof(lerrno));
00723       }
00724       catch (...)
00725       {
00726          int errorVal = UNKNOWN_EXCEPTION;
00727          write(execErrorFd, &errorVal, sizeof(errorVal));
00728       }
00729       _exit(127);
00730    }
00731 
00732    // parent process
00733    retval.pid (forkrv);
00734 
00735    // this should only fail because of programmer error.
00736    UnnamedPipeRef foo1 = retval.in();
00737    PosixUnnamedPipeRef in = foo1.cast_to<PosixUnnamedPipe>();
00738    UnnamedPipeRef foo2 = retval.out(); 
00739    PosixUnnamedPipeRef out = foo2.cast_to<PosixUnnamedPipe>();
00740    UnnamedPipeRef foo3 = retval.err(); 
00741    PosixUnnamedPipeRef err = foo3.cast_to<PosixUnnamedPipe>();
00742    BLOCXX_ASSERT(in);
00743    BLOCXX_ASSERT(out);
00744    BLOCXX_ASSERT(err);
00745    // prevent the parent from using the child's end of the pipes.
00746    in->closeInputHandle();
00747    out->closeOutputHandle();
00748    err->closeOutputHandle();
00749    
00750    PosixUnnamedPipeRef execErrorPosixPipe = execErrorPipe.cast_to<PosixUnnamedPipe>();
00751    BLOCXX_ASSERT(execErrorPosixPipe);
00752    // we need to close the parent's output side so that when the child's output side is closed, it can be detected.
00753    execErrorPosixPipe->closeOutputHandle();
00754 
00755    const int SECONDS_TO_WAIT_FOR_CHILD_TO_EXEC = 10; // 10 seconds should be plenty for the child to go from fork() to execv()
00756    execErrorPipe->setReadTimeout(SECONDS_TO_WAIT_FOR_CHILD_TO_EXEC);
00757 
00758    int childErrorCode = 0;
00759    int bytesRead = execErrorPipe->read(&childErrorCode, sizeof(childErrorCode));
00760    // 0 bytes means execv() happened successfully.
00761    if (bytesRead == ETIMEDOUT) // broken interface... grumble, grumble...
00762    {
00763       // for some reason the child never ran exec(). Must've deadlocked or the system is *really* loaded down.
00764       // Kill it forcefully.
00765       kill(forkrv, SIGKILL);
00766       BLOCXX_THROW(ExecErrorException, "Exec::safePopen: timed out waiting for child process to exec()");
00767    }
00768    if (bytesRead > 0)
00769    {
00770       // exec failed
00771       if (childErrorCode == UNKNOWN_EXCEPTION)
00772       {
00773          BLOCXX_THROW(ExecErrorException, "Exec::safePopen: child process caught an exception before reaching exec()");
00774       }
00775       else
00776       {
00777          errno = childErrorCode;
00778          BLOCXX_THROW_ERRNO_MSG(ExecErrorException, Format("Exec::safePopen: child process failed running exec() process = %1", command[0]));
00779       }
00780    }
00781 
00782    return retval;
00783 }
00784 
00785 namespace
00786 {
00787 
00788 #ifndef BLOCXX_MIN
00789 #define BLOCXX_MIN(x, y) (x) < (y) ? (x) : (y)
00790 #endif
00791 
00793 class StringOutputGatherer : public OutputCallback
00794 {
00795 public:
00796    StringOutputGatherer(String& output, int outputLimit)
00797       : m_stdout(output)
00798       , m_stderr(output)
00799       , m_outputLimit(outputLimit)
00800    {
00801    }
00802    StringOutputGatherer(String& stdOutput, String& errOutput, int outputLimit)
00803       : m_stdout(stdOutput)
00804       , m_stderr(errOutput)
00805       , m_outputLimit(outputLimit)
00806    {
00807    }
00808 private:
00809    virtual void doHandleData(const char* data, size_t dataLen, EOutputSource outputSource, PopenStreams& theStream, size_t streamIndex, Array<char>& inputBuffer)
00810    {
00811       String& m_output(outputSource == E_STDOUT ? m_stdout : m_stderr);
00812 
00813       if (m_outputLimit >= 0 && m_output.length() + dataLen > static_cast<size_t>(m_outputLimit))
00814       {
00815          // the process output too much, so just copy what we can and return error
00816          int lentocopy = BLOCXX_MIN(m_outputLimit - m_output.length(), dataLen);
00817          if (lentocopy >= 0)
00818          {
00819             m_output += String(data, lentocopy);
00820          }
00821          BLOCXX_THROW(ExecBufferFullException, "Exec::StringOutputGatherer::doHandleData(): buffer full");
00822       }
00823 
00824       m_output += data;
00825    }
00826    String& m_stdout;
00827    String& m_stderr;
00828    int m_outputLimit;
00829 };
00830 
00832 class SingleStringInputCallback : public InputCallback
00833 {
00834 public:
00835    SingleStringInputCallback(const String& s)
00836       : m_s(s)
00837    {
00838    }
00839 private:
00840    virtual void doGetData(Array<char>& inputBuffer, PopenStreams& theStream, size_t streamIndex)
00841    {
00842       if (m_s.length() > 0)
00843       {
00844          inputBuffer.insert(inputBuffer.end(), m_s.c_str(), m_s.c_str() + m_s.length());
00845          m_s.erase();
00846       }
00847       else if (theStream.in()->isOpen())
00848       {
00849          theStream.in()->close();
00850       }
00851    }
00852    String m_s;
00853 };
00854 
00855 }// end anonymous namespace
00856 
00858 void
00859 executeProcessAndGatherOutput(const Array<String>& command,
00860    String& output, int& processStatus,
00861    int timeoutSecs, int outputLimit, const String& input)
00862 {
00863    executeProcessAndGatherOutput(command, output, processStatus,
00864       EnvVars(EnvVars::E_CURRENT_ENVIRONMENT),
00865       timeoutSecs, outputLimit, input);
00866 }
00867 
00869 void executeProcessAndGatherOutput(
00870    const Array<String>& command,
00871    String& output, 
00872    int& processStatus, 
00873    const EnvVars& envVars,
00874    int timeoutSecs, 
00875    int outputLimit, 
00876    const String& input)
00877 {
00878    processStatus = -1;
00879    Array<PopenStreams> streams;
00880    streams.push_back(safePopen(command, envVars));
00881    Array<ProcessStatus> processStatuses(1);
00882    SingleStringInputCallback singleStringInputCallback(input);
00883 
00884    StringOutputGatherer gatherer(output, outputLimit);
00885    processInputOutput(gatherer, streams, processStatuses, 
00886       singleStringInputCallback, timeoutSecs);
00887 
00888    if (processStatuses[0].hasExited())
00889    {
00890       processStatus = processStatuses[0].getStatus();
00891    }
00892    else
00893    {
00894       processStatus = streams[0].getExitStatus();
00895    }
00896 }
00897 
00899 void executeProcessAndGatherOutput(
00900    const Array<String>& command,
00901    String& stdOutput,
00902    String& errOutput,
00903    int& processStatus, 
00904    const EnvVars& envVars,
00905    int timeoutSecs, 
00906    int outputLimit, 
00907    const String& input)
00908 {
00909    processStatus = -1;
00910    Array<PopenStreams> streams;
00911    streams.push_back(safePopen(command, envVars));
00912    Array<ProcessStatus> processStatuses(1);
00913    SingleStringInputCallback singleStringInputCallback(input);
00914 
00915    StringOutputGatherer gatherer(stdOutput, errOutput, outputLimit);
00916    processInputOutput(gatherer, streams, processStatuses, 
00917       singleStringInputCallback, timeoutSecs);
00918 
00919    if (processStatuses[0].hasExited())
00920    {
00921       processStatus = processStatuses[0].getStatus();
00922    }
00923    else
00924    {
00925       processStatus = streams[0].getExitStatus();
00926    }
00927 }
00928 
00930 void
00931 gatherOutput(String& output, PopenStreams& stream, int& processStatus, int timeoutSecs, int outputLimit)
00932 {
00933    Array<PopenStreams> streams;
00934    streams.push_back(stream);
00935    Array<ProcessStatus> processStatuses(1);
00936 
00937    StringOutputGatherer gatherer(output, outputLimit);
00938    SingleStringInputCallback singleStringInputCallback = SingleStringInputCallback(String());
00939    processInputOutput(gatherer, streams, processStatuses, singleStringInputCallback, timeoutSecs);
00940    if (processStatuses[0].hasExited())
00941    {
00942       processStatus = processStatuses[0].getStatus();
00943    }
00944 }
00945 
00947 OutputCallback::~OutputCallback()
00948 {
00949 
00950 }
00951 
00953 void
00954 OutputCallback::handleData(const char* data, size_t dataLen, EOutputSource outputSource, PopenStreams& theStream, size_t streamIndex, Array<char>& inputBuffer)
00955 {
00956    doHandleData(data, dataLen, outputSource, theStream, streamIndex, inputBuffer);
00957 }
00958 
00960 InputCallback::~InputCallback()
00961 {
00962 }
00963 
00965 void
00966 InputCallback::getData(Array<char>& inputBuffer, PopenStreams& theStream, size_t streamIndex)
00967 {
00968    doGetData(inputBuffer, theStream, streamIndex);
00969 }
00970 
00971 namespace
00972 {
00973    struct ProcessOutputState
00974    {
00975       bool inIsOpen;
00976       bool outIsOpen;
00977       bool errIsOpen;
00978       size_t availableDataLen;
00979 
00980       ProcessOutputState()
00981          : inIsOpen(true)
00982          , outIsOpen(true)
00983          , errIsOpen(true)
00984          , availableDataLen(0)
00985       {
00986       }
00987    };
00988 
00989 }
00990 
00992 void
00993 processInputOutput(OutputCallback& output, Array<PopenStreams>& streams, Array<ProcessStatus>& processStatuses, InputCallback& input, int timeoutsecs)
00994 {
00995    processStatuses.clear();
00996    processStatuses.resize(streams.size());
00997 
00998    Array<ProcessOutputState> processStates(streams.size());
00999    int numOpenPipes(streams.size() * 2); // count of stdout & stderr. Ignore stdin for purposes of algorithm termination.
01000 
01001    DateTime curTime;
01002    curTime.setToCurrent();
01003    DateTime timeoutEnd(curTime);
01004    timeoutEnd += timeoutsecs;
01005 
01006    Array<Array<char> > inputs(processStates.size());
01007    for (size_t i = 0; i < processStates.size(); ++i)
01008    {
01009       input.getData(inputs[i], streams[i], i);
01010       processStates[i].availableDataLen = inputs[i].size();
01011       if (!streams[i].out()->isOpen())
01012       {
01013          processStates[i].outIsOpen = false;
01014       }
01015       if (!streams[i].err()->isOpen())
01016       {
01017          processStates[i].errIsOpen = false;
01018       }
01019       if (!streams[i].in()->isOpen())
01020       {
01021          processStates[i].inIsOpen = false;
01022       }
01023 
01024    }
01025 
01026    while (numOpenPipes > 0)
01027    {
01028       Select::SelectObjectArray selObjs; 
01029       std::map<int, int> inputIndexProcessIndex;
01030       std::map<int, int> outputIndexProcessIndex;
01031       for (size_t i = 0; i < streams.size(); ++i)
01032       {
01033          if (processStates[i].outIsOpen)
01034          {
01035             Select::SelectObject selObj(streams[i].out()->getSelectObj()); 
01036             selObj.waitForRead = true; 
01037             selObjs.push_back(selObj); 
01038             inputIndexProcessIndex[selObjs.size() - 1] = i;
01039          }
01040          if (processStates[i].errIsOpen)
01041          {
01042             Select::SelectObject selObj(streams[i].err()->getSelectObj()); 
01043             selObj.waitForRead = true; 
01044             selObjs.push_back(selObj); 
01045             inputIndexProcessIndex[selObjs.size() - 1] = i;
01046          }
01047          if (processStates[i].inIsOpen && processStates[i].availableDataLen > 0)
01048          {
01049             Select::SelectObject selObj(streams[i].in()->getWriteSelectObj()); 
01050             selObj.waitForWrite = true; 
01051             selObjs.push_back(selObj); 
01052             outputIndexProcessIndex[selObjs.size() - 1] = i;
01053          }
01054 
01055          // check if the child has exited - the pid gets set to -1 once it's exited.
01056          if (streams[i].pid() != -1)
01057          {
01058             pid_t waitpidrv;
01059             int processStatus(-1);
01060             waitpidrv = noIntrWaitPid(streams[i].pid(), &processStatus, WNOHANG);
01061             if (waitpidrv == -1)
01062             {
01063                streams[i].pid(-1);
01064                BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: waitpid() failed");
01065             }
01066             else if (waitpidrv != 0)
01067             {
01068                streams[i].pid(-1);
01069                streams[i].setProcessStatus(processStatus);
01070                processStatuses[i] = ProcessStatus(processStatus);
01071             }
01072          }
01073       }
01074 
01075       const int mstimeout = 100; // use 1/10 of a second
01076       int selectrval = Select::selectRW(selObjs, mstimeout);
01077       switch (selectrval)
01078       {
01079          case Select::SELECT_INTERRUPTED:
01080             // if we got interrupted, just try again
01081             break;
01082          case Select::SELECT_ERROR:
01083          {
01084             BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: error selecting on stdout and stderr");
01085          }
01086          break;
01087          case Select::SELECT_TIMEOUT:
01088          {
01089             // Check all processes and see if they've exited but the pipes are still open. If so, close the pipes,
01090             // since there's nothing to read from them.
01091             for (size_t i = 0; i < streams.size(); ++i)
01092             {
01093                if (streams[i].pid() == -1)
01094                {
01095                   if (processStates[i].inIsOpen)
01096                   {
01097                      processStates[i].inIsOpen = false;
01098                      streams[i].in()->close();
01099                   }
01100                   if (processStates[i].outIsOpen)
01101                   {
01102                      processStates[i].outIsOpen = false;
01103                      streams[i].out()->close();
01104                      --numOpenPipes;
01105                   }
01106                   if (processStates[i].errIsOpen)
01107                   {
01108                      processStates[i].errIsOpen = false;
01109                      streams[i].err()->close();
01110                      --numOpenPipes;
01111                   }
01112                }
01113             }
01114 
01115             curTime.setToCurrent();
01116             if (timeoutsecs >= 0 && curTime > timeoutEnd)
01117             {
01118                BLOCXX_THROW(ExecTimeoutException, "Exec::gatherOutput: timedout");
01119             }
01120          }
01121          break;
01122          default:
01123          {
01124             int availableToFind = selectrval;
01125             // reset the timeout counter
01126             curTime.setToCurrent();
01127             timeoutEnd = curTime;
01128             timeoutEnd += timeoutsecs;
01129 
01130             for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
01131             {
01132                if (!selObjs[i].readAvailable)
01133                {
01134                   continue;
01135                }
01136                else
01137                {
01138                   --availableToFind;
01139                }
01140                int streamIndex = inputIndexProcessIndex[i];
01141                UnnamedPipeRef readstream;
01142                if (processStates[streamIndex].outIsOpen)
01143                {
01144                   if (streams[streamIndex].out()->getSelectObj() == selObjs[i].s)
01145                   {
01146                      readstream = streams[streamIndex].out();
01147                   }
01148                }
01149 
01150                if (!readstream && processStates[streamIndex].errIsOpen)
01151                {
01152                   if (streams[streamIndex].err()->getSelectObj() == selObjs[i].s)
01153                   {
01154                      readstream = streams[streamIndex].err();
01155                   }
01156                }
01157 
01158                if (!readstream)
01159                {
01160                   continue; // for loop
01161                }
01162 
01163                char buff[1024];
01164                int readrc = readstream->read(buff, sizeof(buff) - 1);
01165                if (readrc == 0)
01166                {
01167                   if (readstream == streams[streamIndex].out())
01168                   {
01169                      processStates[streamIndex].outIsOpen = false;
01170                      streams[streamIndex].out()->close();
01171                   }
01172                   else
01173                   {
01174                      processStates[streamIndex].errIsOpen = false;
01175                      streams[streamIndex].err()->close();
01176                   }
01177                   --numOpenPipes;
01178                }
01179                else if (readrc == -1)
01180                {
01181                   BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: read error");
01182                }
01183                else
01184                {
01185                   buff[readrc] = '\0';
01186                   output.handleData(buff, readrc, readstream == streams[streamIndex].out() ? E_STDOUT : E_STDERR, streams[streamIndex],
01187                      streamIndex, inputs[streamIndex]);
01188                }
01189             }
01190 
01191             // handle stdin for all processes which have data to send to them.
01192             for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
01193             {
01194                if (!selObjs[i].writeAvailable)
01195                {
01196                   continue;
01197                }
01198                else
01199                {
01200                   --availableToFind;
01201                }
01202                int streamIndex = outputIndexProcessIndex[i];
01203                UnnamedPipeRef writestream;
01204                if (processStates[streamIndex].inIsOpen)
01205                {
01206                   writestream = streams[streamIndex].in();
01207                }
01208 
01209                if (!writestream)
01210                {
01211                   continue; // for loop
01212                }
01213 
01214                size_t offset = inputs[streamIndex].size() - processStates[streamIndex].availableDataLen;
01215                int writerc = writestream->write(&inputs[streamIndex][offset], processStates[streamIndex].availableDataLen);
01216                if (writerc == 0)
01217                {
01218                   processStates[streamIndex].inIsOpen = false;
01219                   streams[streamIndex].in()->close();
01220                }
01221                else if (writerc == -1)
01222                {
01223                   BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: write error");
01224                }
01225                else
01226                {
01227                   inputs[streamIndex].erase(inputs[streamIndex].begin(), inputs[streamIndex].begin() + writerc);
01228                   input.getData(inputs[streamIndex], streams[streamIndex], streamIndex);
01229                   processStates[streamIndex].availableDataLen = inputs[streamIndex].size();
01230                }
01231             }
01232          }
01233          break;
01234       }
01235    }
01236 }
01237 
01238 } // end namespace Exec
01239 #endif
01240 } // end namespace BLOCXX_NAMESPACE
01241 

Generated on Fri Jun 16 15:39:08 2006 for blocxx by  doxygen 1.4.6