PosixUnnamedPipe.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2004 Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2005 Novell, Inc. All rights reserved.
00004 *
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 *
00008 *  - Redistributions of source code must retain the above copyright notice,
00009 *    this list of conditions and the following disclaimer.
00010 *
00011 *  - Redistributions in binary form must reproduce the above copyright notice,
00012 *    this list of conditions and the following disclaimer in the documentation
00013 *    and/or other materials provided with the distribution.
00014 *
00015 *  - Neither the name of Vintela, Inc., Novell, Inc., nor the names of its
00016 *    contributors may be used to endorse or promote products derived from this
00017 *    software without specific prior written permission.
00018 *
00019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00020 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00021 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00022 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc., Novell, Inc., OR THE 
00023 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
00024 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
00025 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 
00026 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
00027 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
00028 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
00029 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 *******************************************************************************/
00031 
00037 #include "blocxx/BLOCXX_config.h"
00038 #include "blocxx/PosixUnnamedPipe.hpp"
00039 #include "blocxx/AutoPtr.hpp"
00040 #include "blocxx/IOException.hpp"
00041 #include "blocxx/Format.hpp"
00042 #include "blocxx/SocketUtils.hpp"
00043 #include "blocxx/Assertion.hpp"
00044 
00045 extern "C"
00046 {
00047 #ifdef BLOCXX_WIN32
00048    #define _CLOSE ::_close
00049    #define _WRITE ::_write
00050    #define _READ ::_read
00051    #define _OPEN ::_open
00052    #include <io.h>
00053 #else
00054    #ifdef BLOCXX_HAVE_UNISTD_H
00055       #include <unistd.h>
00056    #endif
00057    #define _CLOSE ::close
00058    #define _WRITE ::write
00059    #define _READ ::read
00060    #define _OPEN ::open
00061 #endif
00062 
00063 #include <fcntl.h>
00064 #include <errno.h>
00065 }
00066 #include <cstring>
00067 
00068 namespace BLOCXX_NAMESPACE
00069 {
00070 
00071 #ifdef BLOCXX_NETWARE
00072 namespace
00073 {
00074 class AcceptThread
00075 {
00076 public:
00077    AcceptThread(int serversock)
00078       : m_serversock(serversock)
00079       , m_serverconn(-1)
00080    {
00081    }
00082 
00083    void acceptConnection();
00084    int getConnectFD() { return m_serverconn; }
00085 private:
00086    int m_serversock;
00087    int m_serverconn;
00088 };
00089 
00090 void
00091 AcceptThread::acceptConnection()
00092 {
00093     struct sockaddr_in sin;
00094    size_t val;
00095     int tmp = 1;
00096 
00097    tmp = 1;
00098    ::setsockopt(m_serversock, IPPROTO_TCP, 1,      // #define TCP_NODELAY 1
00099       (char*) &tmp, sizeof(int));
00100    
00101    val = sizeof(struct sockaddr_in);
00102    if ((m_serverconn = ::accept(m_serversock, (struct sockaddr*)&sin, &val))
00103       == -1)
00104    {
00105       return;
00106    }
00107    tmp = 1;
00108    ::setsockopt(m_serverconn, IPPROTO_TCP, 1, // #define TCP_NODELAY 1
00109       (char *) &tmp, sizeof(int));
00110    tmp = 0;
00111    ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
00112              (char*) &tmp, sizeof(int));
00113 }
00114 
00115 void*
00116 runConnClass(void* arg)
00117 {
00118    AcceptThread* acceptThread = (AcceptThread*)(arg);
00119    acceptThread->acceptConnection();
00120    ::pthread_exit(NULL);
00121    return 0;
00122 }
00123 
00124 int
00125 _pipe(int *fds)
00126 {
00127    int svrfd, lerrno, connectfd;
00128    size_t val;
00129     struct sockaddr_in sin;
00130 
00131    svrfd = socket( AF_INET, SOCK_STREAM, 0 );
00132    sin.sin_family = AF_INET;
00133    sin.sin_addr.s_addr = htonl( 0x7f000001 ); // loopback
00134    sin.sin_port = 0;
00135    memset(sin.sin_zero, 0, 8 );
00136    if (bind(svrfd, (struct sockaddr * )&sin, sizeof( struct sockaddr_in ) ) == -1) 
00137    {
00138       int lerrno = errno;
00139 		::close(svrfd);
00140       fprintf(stderr, "CreateSocket(): Failed to bind on socket" );
00141       return -1;
00142    }
00143    if (listen(svrfd, 1) == -1) 
00144    {
00145       int lerrno = errno;
00146 		::close(svrfd);
00147       return -1;
00148    }
00149    val = sizeof(struct sockaddr_in);
00150    if (getsockname(svrfd, ( struct sockaddr * )&sin, &val ) == -1) 
00151    {
00152       int lerrno = errno;
00153       fprintf(stderr, "CreateSocket(): Failed to obtain socket name" );
00154 		::close(svrfd);
00155       return -1;
00156    }
00157 
00158    AcceptThread* pat = new AcceptThread(svrfd);
00159    pthread_t athread;
00160    // Start thread that will accept connection on svrfd.
00161    // Once a connection is made the thread will exit.
00162    pthread_create(&athread, NULL, runConnClass, pat);
00163 
00164    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
00165    if (clientfd == -1)
00166    {
00167       delete pat;
00168       return -1;
00169    }
00170 
00171    // Connect to server 
00172    struct sockaddr_in csin;
00173    csin.sin_family = AF_INET;
00174    csin.sin_addr.s_addr = htonl(0x7f000001); // loopback
00175    csin.sin_port = sin.sin_port;
00176    if (::connect(clientfd, (struct sockaddr*)&csin, sizeof(csin)) == -1)
00177    {
00178       delete pat;
00179       return -1;
00180    }
00181 
00182 #define TCP_NODELAY 1
00183    int tmp = 1;
00184    //
00185    // Set for Non-blocking writes and disable keepalive
00186    //
00187    ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (char*)&tmp, sizeof(int));
00188    tmp = 0;
00189    ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (char*)&tmp, sizeof(int));
00190 
00191    void* threadResult;
00192    // Wait for accept thread to terminate
00193    ::pthread_join(athread, &threadResult);
00194 
00195 	::close(svrfd);
00196    fds[0] = pat->getConnectFD();
00197    fds[1] = clientfd;
00198    delete pat;
00199    return 0;
00200 }
00201 }
00202 #endif // BLOCXX_NETWARE
00203 
00205 // STATIC
00206 UnnamedPipeRef
00207 UnnamedPipe::createUnnamedPipe(EOpen doOpen)
00208 {
00209    return UnnamedPipeRef(new PosixUnnamedPipe(doOpen));
00210 }
00212 PosixUnnamedPipe::PosixUnnamedPipe(EOpen doOpen)
00213 #ifndef BLOCXX_WIN32
00214    : m_blocking(E_BLOCKING)
00215 #endif
00216 {
00217 #ifdef BLOCXX_WIN32
00218    m_blocking[0] = m_blocking[1] = E_BLOCKING;
00219 #endif
00220    m_fds[0] = m_fds[1] = -1;
00221    if (doOpen)
00222    {
00223       open();
00224    }
00225    setTimeouts(60 * 10); // 10 minutes. This helps break deadlocks when using safePopen()
00226    setBlocking(E_BLOCKING); // necessary to set the pipes up right.
00227 }
00228    
00230 PosixUnnamedPipe::~PosixUnnamedPipe()
00231 {
00232    close();
00233 }
00235 void
00236 PosixUnnamedPipe::setBlocking(EBlockingMode outputIsBlocking)
00237 {
00238 #ifdef BLOCXX_WIN32
00239    // precondition
00240    BLOCXX_ASSERT(m_fds[0] != -1 && m_fds[1] != -1);
00241 
00242    m_blocking[0] = outputIsBlocking;
00243    m_blocking[1] = outputIsBlocking;
00244    // Unnamed pipes on Win32 cannot do non-blocking i/o (aka async, overlapped)
00245    // Only named pipes can. If this becomes a problem in the future, then
00246    // PosixUnnamedPipe can be implemented with NamedPipes. I know this can be
00247    // a problem with the signal handling mechanism that is used in the daemon
00248    // code, but I plan on do that differently on Win32
00249 // BLOCXX_ASSERT(outputIsBlocking);
00250    return;
00251 #else
00252    // precondition
00253    BLOCXX_ASSERT(m_fds[0] != -1 && m_fds[1] != -1);
00254 
00255    m_blocking = outputIsBlocking;
00256 
00257    for (size_t i = 0; i <= 1; ++i)
00258    {
00259       int fdflags = fcntl(m_fds[i], F_GETFL, 0);
00260       if (fdflags == -1)
00261       {
00262          BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00263       }
00264       if (outputIsBlocking == E_BLOCKING)
00265       {
00266          fdflags &= !O_NONBLOCK;
00267       }
00268       else
00269       {
00270          fdflags |= O_NONBLOCK;
00271       }
00272       if (fcntl(m_fds[i], F_SETFL, fdflags) == -1)
00273       {
00274          BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00275       }
00276    }
00277 
00278 #endif
00279 }
00281 void
00282 PosixUnnamedPipe::open()
00283 {
00284    if (m_fds[0] != -1)
00285    {
00286       close();
00287    }
00288 #if defined(BLOCXX_WIN32)
00289    HANDLE pipe = CreateNamedPipe( "\\\\.\\pipe\\TestPipe",
00290       PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED,
00291       PIPE_TYPE_MESSAGE,
00292       PIPE_UNLIMITED_INSTANCES,
00293       2560,
00294       2560,
00295       NMPWAIT_USE_DEFAULT_WAIT,
00296       NULL );
00297 
00298    HANDLE client = CreateFile( "\\\\.\\pipe\\TestPipe",
00299       GENERIC_READ,
00300       FILE_SHARE_READ,
00301       NULL,
00302       OPEN_EXISTING,
00303       FILE_FLAG_OVERLAPPED,
00304       NULL );
00305 
00306    HANDLE event1 = CreateEvent(NULL, TRUE, FALSE, NULL);
00307    HANDLE event2 = CreateEvent(NULL, TRUE, FALSE, NULL);
00308 
00309    // Should return immediately since the client connection is open.
00310    BOOL bConnected = ConnectNamedPipe( pipe, NULL );
00311    if( !bConnected && GetLastError() == ERROR_PIPE_CONNECTED )
00312       bConnected = TRUE;
00313 
00314    BOOL bSuccess = 
00315       pipe != INVALID_HANDLE_VALUE && 
00316       client != INVALID_HANDLE_VALUE && 
00317       event1 != INVALID_HANDLE_VALUE &&
00318       event2 != INVALID_HANDLE_VALUE &&
00319       bConnected;
00320 
00321    if( !bSuccess )
00322    {
00323       CloseHandle(pipe);
00324       CloseHandle(client);
00325       CloseHandle(event1);
00326       CloseHandle(event2);
00327    }
00328    else
00329    {
00330       m_fds[0] = (int)client;    // read descriptor
00331       m_fds[1] = (int)pipe;      // write descriptor
00332       m_events[0] = (int)event1;
00333       m_events[1] = (int)event2;
00334    }
00335 
00336    if( !bSuccess )
00337 // if (::_pipe(m_fds, 2560, _O_BINARY) == -1)
00338 #elif defined(BLOCXX_NETWARE)
00339    if (_pipe(m_fds) == -1)
00340 #else
00341    if (::pipe(m_fds) == -1)
00342 #endif
00343    {
00344       m_fds[0] = m_fds[1] = -1;
00345       BLOCXX_THROW(UnnamedPipeException, ::strerror(errno));
00346    }
00347 }
00349 int
00350 PosixUnnamedPipe::close()
00351 {
00352    int rc = -1;
00353    if (m_fds[0] != -1)
00354    {
00355 #ifdef BLOCXX_WIN32
00356       HANDLE h = (HANDLE)m_fds[0];
00357       HANDLE e = (HANDLE)m_events[0];
00358       if( CloseHandle(h) && CloseHandle(e) )
00359          rc = 0;
00360 #else
00361       rc = _CLOSE(m_fds[0]);
00362 #endif
00363       m_fds[0] = -1;
00364    }
00365    if (m_fds[1] != -1)
00366    {
00367 #ifdef BLOCXX_WIN32
00368       HANDLE h = (HANDLE)m_fds[1];
00369       HANDLE e = (HANDLE)m_events[1];
00370       if( CloseHandle(h) && CloseHandle(e) )
00371          rc = 0;
00372 #else
00373       rc = _CLOSE(m_fds[1]);
00374 #endif
00375       m_fds[1] = -1;
00376    }
00377    return rc;
00378 }
00380 bool
00381 PosixUnnamedPipe::isOpen() const
00382 {
00383    return (m_fds[0] != -1) || (m_fds[1] != -1);
00384 }
00385 
00387 int
00388 PosixUnnamedPipe::closeInputHandle()
00389 {
00390    int rc = -1;
00391    if (m_fds[0] != -1)
00392    {
00393 #ifdef BLOCXX_WIN32
00394       HANDLE h = (HANDLE)m_fds[0];
00395       HANDLE e = (HANDLE)m_events[0];
00396       if( CloseHandle(h) && CloseHandle(e) )
00397          rc = 0;
00398 #else
00399       rc = _CLOSE(m_fds[0]);
00400 #endif
00401       m_fds[0] = -1;
00402    }
00403    return rc;
00404 }
00406 int
00407 PosixUnnamedPipe::closeOutputHandle()
00408 {
00409    int rc = -1;
00410    if (m_fds[1] != -1)
00411    {
00412 #ifdef BLOCXX_WIN32
00413       HANDLE h = (HANDLE)m_fds[1];
00414       HANDLE e = (HANDLE)m_events[1];
00415       if( CloseHandle(h) && CloseHandle(e) )
00416          rc = 0;
00417 #else
00418       rc = _CLOSE(m_fds[1]);
00419 #endif
00420       m_fds[1] = -1;
00421    }
00422    return rc;
00423 }
00425 int
00426 PosixUnnamedPipe::write(const void* data, int dataLen, bool errorAsException)
00427 {
00428    int rc = -1;
00429    if (m_fds[1] != -1)
00430    {
00431 #ifndef BLOCXX_WIN32
00432       if (m_blocking == E_BLOCKING)
00433       {
00434          rc = SocketUtils::waitForIO(m_fds[1], m_writeTimeout, SocketFlags::E_WAIT_FOR_OUTPUT);
00435          if (rc != 0)
00436          {
00437             if (errorAsException)
00438             {
00439                BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed."); 
00440             }
00441             else
00442             {
00443                return rc;
00444             }
00445          }
00446       }
00447       rc = _WRITE(m_fds[1], data, dataLen);
00448 #else
00449       BOOL bSuccess = FALSE;
00450 
00451       OVERLAPPED ovl;
00452 
00453       ovl.hEvent = (HANDLE)m_events[1];
00454       ovl.Offset = 0;
00455       ovl.OffsetHigh = 0;
00456 
00457       bSuccess = WriteFile(
00458          (HANDLE)m_fds[1],
00459          data,
00460          dataLen,
00461          NULL,
00462          &ovl);
00463 
00464       if( bSuccess && m_blocking[1] == E_BLOCKING )
00465       {
00466          bSuccess = WaitForSingleObject( (HANDLE)m_events[1], INFINITE ) == WAIT_OBJECT_0;
00467       }
00468 
00469       if( bSuccess )
00470          rc = 0;
00471 #endif
00472    }
00473    if (errorAsException && rc == -1)
00474    {
00475       BLOCXX_THROW_ERRNO_MSG(IOException, "pipe write failed.");
00476    }
00477    return rc;
00478 }
00480 int
00481 PosixUnnamedPipe::read(void* buffer, int bufferLen, bool errorAsException)
00482 {
00483    int rc = -1;
00484    if (m_fds[0] != -1)
00485    {
00486 #ifndef BLOCXX_WIN32
00487       if (m_blocking == E_BLOCKING)
00488       {
00489          rc = SocketUtils::waitForIO(m_fds[0], m_readTimeout, SocketFlags::E_WAIT_FOR_INPUT);
00490          if (rc != 0)
00491          {
00492             if (errorAsException)
00493             {
00494                BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00495             }
00496             else
00497             {
00498                return rc;
00499             }
00500          }
00501       }
00502       rc = _READ(m_fds[0], buffer, bufferLen);
00503 #else
00504       BOOL bSuccess = FALSE;
00505 
00506       OVERLAPPED ovl;
00507 
00508       ovl.hEvent = (HANDLE)m_events[0];
00509       ovl.Offset = 0;
00510       ovl.OffsetHigh = 0;
00511 
00512       bSuccess = ReadFile(
00513          (HANDLE)m_fds[0],
00514          buffer,
00515          bufferLen,
00516          NULL,
00517          &ovl);
00518 
00519       if( bSuccess && m_blocking[0] == E_BLOCKING )
00520       {
00521          bSuccess = WaitForSingleObject( (HANDLE)m_events[0], INFINITE ) == WAIT_OBJECT_0;
00522       }
00523 
00524       if( bSuccess )
00525          rc = 0;
00526 #endif
00527    }
00528    if (errorAsException && rc == -1)
00529    {
00530       BLOCXX_THROW_ERRNO_MSG(IOException, "pipe read failed.");
00531    }
00532    return rc;
00533 }
00535 Select_t
00536 PosixUnnamedPipe::getSelectObj() const
00537 {
00538 #ifdef BLOCXX_WIN32
00539    Select_t selectObj;
00540    selectObj.event = (HANDLE)m_events[0];
00541    selectObj.sockfd = INVALID_SOCKET;
00542    selectObj.networkevents = 0;
00543    selectObj.doreset = false;
00544 
00545    return selectObj;
00546 #else
00547    return m_fds[0];
00548 #endif
00549 }
00550 
00552 Select_t
00553 PosixUnnamedPipe::getWriteSelectObj() const
00554 {
00555 #ifdef BLOCXX_WIN32
00556    Select_t selectObj;
00557    selectObj.event = (HANDLE)m_events[1];
00558    selectObj.sockfd = INVALID_SOCKET;
00559    selectObj.networkevents = 0;
00560    selectObj.doreset = false;
00561 
00562    return selectObj;
00563 #else
00564    return m_fds[1];
00565 #endif
00566 }
00567 
00568 } // end namespace BLOCXX_NAMESPACE
00569 

Generated on Fri Jun 16 15:39:08 2006 for blocxx by  doxygen 1.4.6