SocketBaseImpl.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2004 Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2005 Novell, Inc. All rights reserved.
00004 *
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 *
00008 *  - Redistributions of source code must retain the above copyright notice,
00009 *    this list of conditions and the following disclaimer.
00010 *
00011 *  - Redistributions in binary form must reproduce the above copyright notice,
00012 *    this list of conditions and the following disclaimer in the documentation
00013 *    and/or other materials provided with the distribution.
00014 *
00015 *  - Neither the name of Vintela, Inc., Novell, Inc., nor the names of its
00016 *    contributors may be used to endorse or promote products derived from this
00017 *    software without specific prior written permission.
00018 *
00019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00020 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00021 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00022 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc., Novell, Inc., OR THE 
00023 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
00024 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
00025 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 
00026 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
00027 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
00028 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
00029 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 *******************************************************************************/
00031 
00032 
00037 #include "blocxx/BLOCXX_config.h"
00038 
00039 #if !defined(BLOCXX_WIN32)
00040 
00041 #include "blocxx/SocketBaseImpl.hpp"
00042 #include "blocxx/SocketUtils.hpp"
00043 #include "blocxx/Format.hpp"
00044 #include "blocxx/Assertion.hpp"
00045 #include "blocxx/IOException.hpp"
00046 #include "blocxx/Mutex.hpp"
00047 #include "blocxx/MutexLock.hpp"
00048 #include "blocxx/PosixUnnamedPipe.hpp"
00049 #include "blocxx/Socket.hpp"
00050 #include "blocxx/Thread.hpp"
00051 #include "blocxx/DateTime.hpp"
00052 
00053 extern "C"
00054 {
00055 #ifdef BLOCXX_HAVE_SYS_POLL_H
00056  #include <sys/poll.h>
00057 #elif defined (BLOCXX_HAVE_SYS_SELECT_H)
00058  #include <sys/select.h>
00059 #else 
00060  #error "port me!"
00061 #endif
00062 
00063 #include <sys/types.h>
00064 #include <sys/time.h>
00065 #include <sys/socket.h>
00066 #include <sys/stat.h>
00067 #include <netdb.h>
00068 #include <arpa/inet.h>
00069 #include <unistd.h>
00070 #include <fcntl.h>
00071 #include <netinet/in.h>
00072 }
00073 
00074 #include <fstream>
00075 #include <cerrno>
00076 #include <cstdio>
00077 
00078 namespace BLOCXX_NAMESPACE
00079 {
00080 
00081 using std::istream;
00082 using std::ostream;
00083 using std::iostream;
00084 using std::ifstream;
00085 using std::ofstream;
00086 using std::fstream;
00087 using std::ios;
00088 String SocketBaseImpl::m_traceFileOut;
00089 String SocketBaseImpl::m_traceFileIn;
00090 
00092 SocketBaseImpl::SocketBaseImpl()
00093    : SelectableIFC()
00094    , IOIFC()
00095    , m_isConnected(false)
00096    , m_sockfd(-1)
00097    , m_localAddress()
00098    , m_peerAddress()
00099    , m_recvTimeoutExprd(false)
00100    , m_streamBuf(this)
00101    , m_in(&m_streamBuf)
00102    , m_out(&m_streamBuf)
00103    , m_inout(&m_streamBuf)
00104    , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00105    , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00106    , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00107 {
00108    m_out.exceptions(std::ios::badbit);
00109    m_inout.exceptions(std::ios::badbit);
00110 }
00112 SocketBaseImpl::SocketBaseImpl(SocketHandle_t fd,
00113       SocketAddress::AddressType addrType)
00114    : SelectableIFC()
00115    , IOIFC()
00116    , m_isConnected(true)
00117    , m_sockfd(fd)
00118    , m_localAddress(SocketAddress::getAnyLocalHost())
00119    , m_peerAddress(SocketAddress::allocEmptyAddress(addrType))
00120    , m_recvTimeoutExprd(false)
00121    , m_streamBuf(this)
00122    , m_in(&m_streamBuf)
00123    , m_out(&m_streamBuf)
00124    , m_inout(&m_streamBuf)
00125    , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00126    , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00127    , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00128 {
00129    m_out.exceptions(std::ios::badbit);
00130    m_inout.exceptions(std::ios::badbit);
00131    if (addrType == SocketAddress::INET)
00132    {
00133       fillInetAddrParms();
00134    }
00135    else if (addrType == SocketAddress::UDS)
00136    {
00137       fillUnixAddrParms();
00138    }
00139    else
00140    {
00141       BLOCXX_ASSERT(0);
00142    }
00143 }
00145 SocketBaseImpl::SocketBaseImpl(const SocketAddress& addr)
00146    : SelectableIFC()
00147    , IOIFC()
00148    , m_isConnected(false)
00149    , m_sockfd(-1)
00150    , m_localAddress(SocketAddress::getAnyLocalHost())
00151    , m_peerAddress(addr)
00152    , m_recvTimeoutExprd(false)
00153    , m_streamBuf(this)
00154    , m_in(&m_streamBuf)
00155    , m_out(&m_streamBuf)
00156    , m_inout(&m_streamBuf)
00157    , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00158    , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00159    , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00160 {
00161    m_out.exceptions(std::ios::badbit);
00162    m_inout.exceptions(std::ios::badbit);
00163    connect(m_peerAddress);
00164 }
00166 SocketBaseImpl::~SocketBaseImpl()
00167 {
00168    try
00169    {
00170       disconnect();
00171    }
00172    catch (...)
00173    {
00174       // don't let exceptions escape
00175    }
00176 }
00178 Select_t
00179 SocketBaseImpl::getSelectObj() const
00180 {
00181    return m_sockfd;
00182 }
00184 void
00185 SocketBaseImpl::connect(const SocketAddress& addr)
00186 {
00187    if (m_isConnected)
00188    {
00189       disconnect();
00190    }
00191    m_streamBuf.reset();
00192    m_in.clear();
00193    m_out.clear();
00194    m_inout.clear();
00195    BLOCXX_ASSERT(addr.getType() == SocketAddress::INET
00196          || addr.getType() == SocketAddress::UDS);
00197    if ((m_sockfd = ::socket(addr.getType() == SocketAddress::INET ?
00198       AF_INET : PF_UNIX, SOCK_STREAM, 0)) == -1)
00199    {
00200       BLOCXX_THROW_ERRNO_MSG(SocketException,
00201          "Failed to create a socket");
00202    }
00203    // set the close on exec flag so child process can't keep the socket.
00204    if (::fcntl(m_sockfd, F_SETFD, FD_CLOEXEC) == -1)
00205    {
00206       ::close(m_sockfd);
00207       BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() failed to set close-on-exec flag on socket");
00208    }
00209    int n;
00210    int flags = ::fcntl(m_sockfd, F_GETFL, 0);
00211    ::fcntl(m_sockfd, F_SETFL, flags | O_NONBLOCK);
00212    if ((n = ::connect(m_sockfd, addr.getNativeForm(),
00213                addr.getNativeFormSize())) < 0)
00214    {
00215       if (errno != EINPROGRESS)
00216       {
00217 			::close(m_sockfd);
00218          BLOCXX_THROW_ERRNO_MSG(SocketException,
00219             Format("Failed to connect to: %1", addr.toString()).c_str());
00220       }
00221    }
00222    if (n == -1)
00223    {
00224       // because of the above check for EINPROGRESS
00225       // not connected yet, need to select and wait for connection to complete.
00226       PosixUnnamedPipeRef lUPipe;
00227       int pipefd = -1;
00228       if (Socket::getShutDownMechanism())
00229       {
00230          UnnamedPipeRef foo = Socket::getShutDownMechanism();
00231          lUPipe = foo.cast_to<PosixUnnamedPipe>();
00232          BLOCXX_ASSERT(lUPipe);
00233          pipefd = lUPipe->getInputHandle();
00234       }
00235 
00236 #ifdef BLOCXX_HAVE_SYS_POLL_H
00237       struct pollfd pfds[2]; 
00238 #else
00239       fd_set rset, wset;
00240 #endif
00241       // here we spin checking for thread cancellation every so often.
00242       UInt32 remainingMsWait = m_connectTimeout != Socket::INFINITE_TIMEOUT ? m_connectTimeout * 1000 : ~0U;
00243       do
00244       {
00245 #ifdef BLOCXX_HAVE_SYS_POLL_H
00246          int numfds = 0; 
00247          if (pipefd != -1)
00248          {
00249             numfds = 2; 
00250             pfds[1].fd = pipefd; 
00251             pfds[1].events = POLLIN | POLLPRI; 
00252          }
00253          else
00254          {
00255             numfds = 1; 
00256          }
00257          pfds[0].fd = m_sockfd; 
00258          pfds[0].events = POLLIN | POLLPRI | POLLOUT; 
00259          const UInt32 waitMs = 100; // 1/10 of a second
00260          int waitTime = std::min((waitMs % 1000), remainingMsWait); 
00261          Thread::testCancel();
00262          n = ::poll(pfds, numfds, waitTime); 
00263 #else
00264          FD_ZERO(&rset);
00265          FD_SET(m_sockfd, &rset);
00266          if (pipefd != -1)
00267          {
00268             FD_SET(pipefd, &rset);
00269          }
00270          FD_ZERO(&wset);
00271          FD_SET(m_sockfd, &wset);
00272          int maxfd = m_sockfd > pipefd ? m_sockfd : pipefd;
00273 
00274          const UInt32 waitMs = 100; // 1/10 of a second
00275          struct timeval tv;
00276          tv.tv_sec = 0;
00277          tv.tv_usec = std::min((waitMs % 1000), remainingMsWait) * 1000;
00278 
00279          Thread::testCancel();
00280          n = ::select(maxfd+1, &rset, &wset, NULL, &tv);
00281 #endif
00282 
00283          if (m_connectTimeout != Socket::INFINITE_TIMEOUT)
00284          {
00285             remainingMsWait -= std::min(waitMs, remainingMsWait);
00286          }
00287       } while (n == 0 && remainingMsWait > 0);
00288 
00289       if (n == 0)
00290       {
00291 			::close(m_sockfd);
00292          BLOCXX_THROW(SocketException, "SocketBaseImpl::connect() select timedout");
00293       }
00294       else if (n == -1)
00295       {
00296 			::close(m_sockfd);
00297          if (errno == EINTR)
00298          {
00299             Thread::testCancel();
00300          }
00301          BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() select failed");
00302       }
00303 #ifdef BLOCXX_HAVE_SYS_POLL_H
00304       if (pipefd != -1 
00305             && (pfds[1].revents & POLLPRI || pfds[1].revents & POLLIN))
00306 #else
00307       if (pipefd != -1 && FD_ISSET(pipefd, &rset))
00308 #endif
00309       {
00310 			::close(m_sockfd);
00311          BLOCXX_THROW(SocketException, "Sockets have been shutdown");
00312       }
00313 #ifdef BLOCXX_HAVE_SYS_POLL_H
00314       else if (pfds[0].revents & POLLOUT || pfds[0].revents & POLLPRI
00315                || pfds[0].revents & POLLIN)
00316 #else
00317       else if (FD_ISSET(m_sockfd, &rset) || FD_ISSET(m_sockfd, &wset))
00318 #endif
00319       {
00320          int error = 0;
00321          socklen_t len = sizeof(error);
00322          if (::getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &error,
00323                   &len) < 0)
00324          {
00325 				::close(m_sockfd);
00326             BLOCXX_THROW_ERRNO_MSG(SocketException,
00327                   "SocketBaseImpl::connect() getsockopt() failed");
00328          }
00329          if (error != 0)
00330          {
00331 				::close(m_sockfd);
00332             errno = error;
00333             BLOCXX_THROW_ERRNO_MSG(SocketException,
00334                   "SocketBaseImpl::connect() failed");
00335          }
00336       }
00337       else
00338       {
00339 			::close(m_sockfd);
00340          BLOCXX_THROW(SocketException, "SocketBaseImpl::connect(). Logic error, m_sockfd not in FD set.");
00341       }
00342    }
00343    ::fcntl(m_sockfd, F_SETFL, flags);
00344    m_isConnected = true;
00345    m_peerAddress = addr;   // assign hostname. 
00346    if (addr.getType() == SocketAddress::INET)
00347    {
00348       fillInetAddrParms();
00349    }
00350    else if (addr.getType() == SocketAddress::UDS)
00351    {
00352       fillUnixAddrParms();
00353    }
00354    else
00355    {
00356       BLOCXX_ASSERT(0);
00357    }
00358 }
00360 void
00361 SocketBaseImpl::disconnect()
00362 {
00363    if (m_in)
00364    {
00365       m_in.clear(ios::eofbit);
00366    }
00367    if (m_out)
00368    {
00369       m_out.clear(ios::eofbit);
00370    }
00371    if (m_inout)
00372    {
00373       m_inout.clear(ios::eofbit);
00374    }
00375    if (m_sockfd != -1 && m_isConnected)
00376    {
00377 		::close(m_sockfd);
00378       m_isConnected = false;
00379       m_sockfd = -1;
00380    }
00381 }
00383 // JBW this needs reworked.
00384 void
00385 SocketBaseImpl::fillInetAddrParms()
00386 {
00387    socklen_t len;
00388    InetSocketAddress_t addr;
00389    memset(&addr, 0, sizeof(addr));
00390    len = sizeof(addr);
00391    if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00392    {
00393 // Don't error out here, we can still operate without working DNS.
00394 //    BLOCXX_THROW_ERRNO_MSG(SocketException,
00395 //          "SocketBaseImpl::fillInetAddrParms: getsockname");
00396    }
00397    else
00398    {
00399       m_localAddress.assignFromNativeForm(&addr, len);
00400    }
00401    len = sizeof(addr);
00402    if (getpeername(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00403    {
00404 // Don't error out here, we can still operate without working DNS.
00405 //    BLOCXX_THROW_ERRNO_MSG(SocketException,
00406 //          "SocketBaseImpl::fillInetAddrParms: getpeername");
00407    }
00408    else
00409    {
00410       m_peerAddress.assignFromNativeForm(&addr, len);
00411    }
00412 }
00414 void
00415 SocketBaseImpl::fillUnixAddrParms()
00416 {
00417    socklen_t len;
00418    UnixSocketAddress_t addr;
00419    memset(&addr, 0, sizeof(addr));
00420    len = sizeof(addr);
00421    if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00422    {
00423       BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::fillUnixAddrParms: getsockname");
00424    }
00425    m_localAddress.assignFromNativeForm(&addr, len);
00426    m_peerAddress.assignFromNativeForm(&addr, len);
00427 }
00428 static Mutex guard;
00430 int
00431 SocketBaseImpl::write(const void* dataOut, int dataOutLen, bool errorAsException)
00432 {
00433    int rc = 0;
00434    bool isError = false;
00435    if (m_isConnected)
00436    {
00437       isError = waitForOutput(m_sendTimeout);
00438       if (isError)
00439       {
00440          rc = -1;
00441       }
00442       else
00443       {
00444          rc = writeAux(dataOut, dataOutLen);
00445          if (!m_traceFileOut.empty() && rc > 0)
00446          {
00447             MutexLock ml(guard);
00448             ofstream traceFile(m_traceFileOut.c_str(), std::ios::app);
00449             if (!traceFile)
00450             {
00451                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00452             }
00453             if (!traceFile.write(static_cast<const char*>(dataOut), rc))
00454             {
00455                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00456             }
00457 
00458             ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00459             if (!comboTraceFile)
00460             {
00461                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00462             }
00463             DateTime curDateTime;
00464             curDateTime.setToCurrent();
00465             comboTraceFile << "\n--->Out " << rc << " bytes at " << curDateTime.toString("%X") <<
00466                '.' << curDateTime.getMicrosecond() << "<---\n";
00467             if (!comboTraceFile.write(static_cast<const char*>(dataOut), rc))
00468             {
00469                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00470             }
00471          }
00472       }
00473    }
00474    else
00475    {
00476       rc = -1;
00477    }
00478    if (rc < 0 && errorAsException)
00479    {
00480       BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::write");
00481    }
00482    return rc;
00483 }
00485 int
00486 SocketBaseImpl::read(void* dataIn, int dataInLen, bool errorAsException)   
00487 {
00488    int rc = 0;
00489    bool isError = false;
00490    if (m_isConnected)
00491    {
00492       isError = waitForInput(m_recvTimeout);
00493       if (isError)
00494       {
00495          rc = -1;
00496       }
00497       else
00498       {
00499          rc = readAux(dataIn, dataInLen);
00500          if (!m_traceFileIn.empty() && rc > 0)
00501          {
00502             MutexLock ml(guard);
00503             ofstream traceFile(m_traceFileIn.c_str(), std::ios::app);
00504             if (!traceFile)
00505             {
00506                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed opening tracefile");
00507             }
00508             if (!traceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00509             {
00510                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00511             }
00512 
00513             ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00514             if (!comboTraceFile)
00515             {
00516                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00517             }
00518             DateTime curDateTime;
00519             curDateTime.setToCurrent();
00520             comboTraceFile << "\n--->In " << rc << " bytes at " << curDateTime.toString("%X") <<
00521                '.' << curDateTime.getMicrosecond() << "<---\n";
00522             if (!comboTraceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00523             {
00524                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00525             }
00526          }
00527       }
00528    }
00529    else
00530    {
00531       rc = -1;
00532    }
00533    if (rc < 0)
00534    {
00535       if (errorAsException)
00536       {
00537          BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::read");
00538       }
00539    }
00540    return rc;
00541 }
00543 bool
00544 SocketBaseImpl::waitForInput(int timeOutSecs)
00545 {
00546    int rval = SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_INPUT);
00547    if (rval == ETIMEDOUT)
00548    {
00549       m_recvTimeoutExprd = true;
00550    }
00551    else
00552    {
00553       m_recvTimeoutExprd = false;
00554    }
00555    return (rval != 0);
00556 }
00558 bool
00559 SocketBaseImpl::waitForOutput(int timeOutSecs)
00560 {
00561    return SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_OUTPUT) != 0;
00562 }
00564 istream&
00565 SocketBaseImpl::getInputStream()
00566 {
00567    return m_in;
00568 }
00570 ostream&
00571 SocketBaseImpl::getOutputStream()
00572 {
00573    return m_out;
00574 }
00576 iostream&
00577 SocketBaseImpl::getIOStream()
00578 {
00579    return m_inout;
00580 }
00582 // STATIC
00583 void
00584 SocketBaseImpl::setDumpFiles(const String& in, const String& out)
00585 {
00586    m_traceFileOut = out;
00587    m_traceFileIn = in;
00588 }
00589 
00590 } // end namespace BLOCXX_NAMESPACE
00591 
00592 #endif   // #if !defined(BLOCXX_WIN32)
00593 

Generated on Fri Jun 16 15:39:09 2006 for blocxx by  doxygen 1.4.6