00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00037 #include "blocxx/BLOCXX_config.h"
00038
00039 #if !defined(BLOCXX_WIN32)
00040
00041 #include "blocxx/SocketBaseImpl.hpp"
00042 #include "blocxx/SocketUtils.hpp"
00043 #include "blocxx/Format.hpp"
00044 #include "blocxx/Assertion.hpp"
00045 #include "blocxx/IOException.hpp"
00046 #include "blocxx/Mutex.hpp"
00047 #include "blocxx/MutexLock.hpp"
00048 #include "blocxx/PosixUnnamedPipe.hpp"
00049 #include "blocxx/Socket.hpp"
00050 #include "blocxx/Thread.hpp"
00051 #include "blocxx/DateTime.hpp"
00052
00053 extern "C"
00054 {
00055 #ifdef BLOCXX_HAVE_SYS_POLL_H
00056 #include <sys/poll.h>
00057 #elif defined (BLOCXX_HAVE_SYS_SELECT_H)
00058 #include <sys/select.h>
00059 #else
00060 #error "port me!"
00061 #endif
00062
00063 #include <sys/types.h>
00064 #include <sys/time.h>
00065 #include <sys/socket.h>
00066 #include <sys/stat.h>
00067 #include <netdb.h>
00068 #include <arpa/inet.h>
00069 #include <unistd.h>
00070 #include <fcntl.h>
00071 #include <netinet/in.h>
00072 }
00073
00074 #include <fstream>
00075 #include <cerrno>
00076 #include <cstdio>
00077
00078 namespace BLOCXX_NAMESPACE
00079 {
00080
00081 using std::istream;
00082 using std::ostream;
00083 using std::iostream;
00084 using std::ifstream;
00085 using std::ofstream;
00086 using std::fstream;
00087 using std::ios;
00088 String SocketBaseImpl::m_traceFileOut;
00089 String SocketBaseImpl::m_traceFileIn;
00090
00092 SocketBaseImpl::SocketBaseImpl()
00093 : SelectableIFC()
00094 , IOIFC()
00095 , m_isConnected(false)
00096 , m_sockfd(-1)
00097 , m_localAddress()
00098 , m_peerAddress()
00099 , m_recvTimeoutExprd(false)
00100 , m_streamBuf(this)
00101 , m_in(&m_streamBuf)
00102 , m_out(&m_streamBuf)
00103 , m_inout(&m_streamBuf)
00104 , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00105 , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00106 , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00107 {
00108 m_out.exceptions(std::ios::badbit);
00109 m_inout.exceptions(std::ios::badbit);
00110 }
00112 SocketBaseImpl::SocketBaseImpl(SocketHandle_t fd,
00113 SocketAddress::AddressType addrType)
00114 : SelectableIFC()
00115 , IOIFC()
00116 , m_isConnected(true)
00117 , m_sockfd(fd)
00118 , m_localAddress(SocketAddress::getAnyLocalHost())
00119 , m_peerAddress(SocketAddress::allocEmptyAddress(addrType))
00120 , m_recvTimeoutExprd(false)
00121 , m_streamBuf(this)
00122 , m_in(&m_streamBuf)
00123 , m_out(&m_streamBuf)
00124 , m_inout(&m_streamBuf)
00125 , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00126 , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00127 , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00128 {
00129 m_out.exceptions(std::ios::badbit);
00130 m_inout.exceptions(std::ios::badbit);
00131 if (addrType == SocketAddress::INET)
00132 {
00133 fillInetAddrParms();
00134 }
00135 else if (addrType == SocketAddress::UDS)
00136 {
00137 fillUnixAddrParms();
00138 }
00139 else
00140 {
00141 BLOCXX_ASSERT(0);
00142 }
00143 }
00145 SocketBaseImpl::SocketBaseImpl(const SocketAddress& addr)
00146 : SelectableIFC()
00147 , IOIFC()
00148 , m_isConnected(false)
00149 , m_sockfd(-1)
00150 , m_localAddress(SocketAddress::getAnyLocalHost())
00151 , m_peerAddress(addr)
00152 , m_recvTimeoutExprd(false)
00153 , m_streamBuf(this)
00154 , m_in(&m_streamBuf)
00155 , m_out(&m_streamBuf)
00156 , m_inout(&m_streamBuf)
00157 , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00158 , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00159 , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00160 {
00161 m_out.exceptions(std::ios::badbit);
00162 m_inout.exceptions(std::ios::badbit);
00163 connect(m_peerAddress);
00164 }
00166 SocketBaseImpl::~SocketBaseImpl()
00167 {
00168 try
00169 {
00170 disconnect();
00171 }
00172 catch (...)
00173 {
00174
00175 }
00176 }
00178 Select_t
00179 SocketBaseImpl::getSelectObj() const
00180 {
00181 return m_sockfd;
00182 }
00184 void
00185 SocketBaseImpl::connect(const SocketAddress& addr)
00186 {
00187 if (m_isConnected)
00188 {
00189 disconnect();
00190 }
00191 m_streamBuf.reset();
00192 m_in.clear();
00193 m_out.clear();
00194 m_inout.clear();
00195 BLOCXX_ASSERT(addr.getType() == SocketAddress::INET
00196 || addr.getType() == SocketAddress::UDS);
00197 if ((m_sockfd = ::socket(addr.getType() == SocketAddress::INET ?
00198 AF_INET : PF_UNIX, SOCK_STREAM, 0)) == -1)
00199 {
00200 BLOCXX_THROW_ERRNO_MSG(SocketException,
00201 "Failed to create a socket");
00202 }
00203
00204 if (::fcntl(m_sockfd, F_SETFD, FD_CLOEXEC) == -1)
00205 {
00206 ::close(m_sockfd);
00207 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() failed to set close-on-exec flag on socket");
00208 }
00209 int n;
00210 int flags = ::fcntl(m_sockfd, F_GETFL, 0);
00211 ::fcntl(m_sockfd, F_SETFL, flags | O_NONBLOCK);
00212 if ((n = ::connect(m_sockfd, addr.getNativeForm(),
00213 addr.getNativeFormSize())) < 0)
00214 {
00215 if (errno != EINPROGRESS)
00216 {
00217 ::close(m_sockfd);
00218 BLOCXX_THROW_ERRNO_MSG(SocketException,
00219 Format("Failed to connect to: %1", addr.toString()).c_str());
00220 }
00221 }
00222 if (n == -1)
00223 {
00224
00225
00226 PosixUnnamedPipeRef lUPipe;
00227 int pipefd = -1;
00228 if (Socket::getShutDownMechanism())
00229 {
00230 UnnamedPipeRef foo = Socket::getShutDownMechanism();
00231 lUPipe = foo.cast_to<PosixUnnamedPipe>();
00232 BLOCXX_ASSERT(lUPipe);
00233 pipefd = lUPipe->getInputHandle();
00234 }
00235
00236 #ifdef BLOCXX_HAVE_SYS_POLL_H
00237 struct pollfd pfds[2];
00238 #else
00239 fd_set rset, wset;
00240 #endif
00241
00242 UInt32 remainingMsWait = m_connectTimeout != Socket::INFINITE_TIMEOUT ? m_connectTimeout * 1000 : ~0U;
00243 do
00244 {
00245 #ifdef BLOCXX_HAVE_SYS_POLL_H
00246 int numfds = 0;
00247 if (pipefd != -1)
00248 {
00249 numfds = 2;
00250 pfds[1].fd = pipefd;
00251 pfds[1].events = POLLIN | POLLPRI;
00252 }
00253 else
00254 {
00255 numfds = 1;
00256 }
00257 pfds[0].fd = m_sockfd;
00258 pfds[0].events = POLLIN | POLLPRI | POLLOUT;
00259 const UInt32 waitMs = 100;
00260 int waitTime = std::min((waitMs % 1000), remainingMsWait);
00261 Thread::testCancel();
00262 n = ::poll(pfds, numfds, waitTime);
00263 #else
00264 FD_ZERO(&rset);
00265 FD_SET(m_sockfd, &rset);
00266 if (pipefd != -1)
00267 {
00268 FD_SET(pipefd, &rset);
00269 }
00270 FD_ZERO(&wset);
00271 FD_SET(m_sockfd, &wset);
00272 int maxfd = m_sockfd > pipefd ? m_sockfd : pipefd;
00273
00274 const UInt32 waitMs = 100;
00275 struct timeval tv;
00276 tv.tv_sec = 0;
00277 tv.tv_usec = std::min((waitMs % 1000), remainingMsWait) * 1000;
00278
00279 Thread::testCancel();
00280 n = ::select(maxfd+1, &rset, &wset, NULL, &tv);
00281 #endif
00282
00283 if (m_connectTimeout != Socket::INFINITE_TIMEOUT)
00284 {
00285 remainingMsWait -= std::min(waitMs, remainingMsWait);
00286 }
00287 } while (n == 0 && remainingMsWait > 0);
00288
00289 if (n == 0)
00290 {
00291 ::close(m_sockfd);
00292 BLOCXX_THROW(SocketException, "SocketBaseImpl::connect() select timedout");
00293 }
00294 else if (n == -1)
00295 {
00296 ::close(m_sockfd);
00297 if (errno == EINTR)
00298 {
00299 Thread::testCancel();
00300 }
00301 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() select failed");
00302 }
00303 #ifdef BLOCXX_HAVE_SYS_POLL_H
00304 if (pipefd != -1
00305 && (pfds[1].revents & POLLPRI || pfds[1].revents & POLLIN))
00306 #else
00307 if (pipefd != -1 && FD_ISSET(pipefd, &rset))
00308 #endif
00309 {
00310 ::close(m_sockfd);
00311 BLOCXX_THROW(SocketException, "Sockets have been shutdown");
00312 }
00313 #ifdef BLOCXX_HAVE_SYS_POLL_H
00314 else if (pfds[0].revents & POLLOUT || pfds[0].revents & POLLPRI
00315 || pfds[0].revents & POLLIN)
00316 #else
00317 else if (FD_ISSET(m_sockfd, &rset) || FD_ISSET(m_sockfd, &wset))
00318 #endif
00319 {
00320 int error = 0;
00321 socklen_t len = sizeof(error);
00322 if (::getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &error,
00323 &len) < 0)
00324 {
00325 ::close(m_sockfd);
00326 BLOCXX_THROW_ERRNO_MSG(SocketException,
00327 "SocketBaseImpl::connect() getsockopt() failed");
00328 }
00329 if (error != 0)
00330 {
00331 ::close(m_sockfd);
00332 errno = error;
00333 BLOCXX_THROW_ERRNO_MSG(SocketException,
00334 "SocketBaseImpl::connect() failed");
00335 }
00336 }
00337 else
00338 {
00339 ::close(m_sockfd);
00340 BLOCXX_THROW(SocketException, "SocketBaseImpl::connect(). Logic error, m_sockfd not in FD set.");
00341 }
00342 }
00343 ::fcntl(m_sockfd, F_SETFL, flags);
00344 m_isConnected = true;
00345 m_peerAddress = addr;
00346 if (addr.getType() == SocketAddress::INET)
00347 {
00348 fillInetAddrParms();
00349 }
00350 else if (addr.getType() == SocketAddress::UDS)
00351 {
00352 fillUnixAddrParms();
00353 }
00354 else
00355 {
00356 BLOCXX_ASSERT(0);
00357 }
00358 }
00360 void
00361 SocketBaseImpl::disconnect()
00362 {
00363 if (m_in)
00364 {
00365 m_in.clear(ios::eofbit);
00366 }
00367 if (m_out)
00368 {
00369 m_out.clear(ios::eofbit);
00370 }
00371 if (m_inout)
00372 {
00373 m_inout.clear(ios::eofbit);
00374 }
00375 if (m_sockfd != -1 && m_isConnected)
00376 {
00377 ::close(m_sockfd);
00378 m_isConnected = false;
00379 m_sockfd = -1;
00380 }
00381 }
00383
00384 void
00385 SocketBaseImpl::fillInetAddrParms()
00386 {
00387 socklen_t len;
00388 InetSocketAddress_t addr;
00389 memset(&addr, 0, sizeof(addr));
00390 len = sizeof(addr);
00391 if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00392 {
00393
00394
00395
00396 }
00397 else
00398 {
00399 m_localAddress.assignFromNativeForm(&addr, len);
00400 }
00401 len = sizeof(addr);
00402 if (getpeername(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00403 {
00404
00405
00406
00407 }
00408 else
00409 {
00410 m_peerAddress.assignFromNativeForm(&addr, len);
00411 }
00412 }
00414 void
00415 SocketBaseImpl::fillUnixAddrParms()
00416 {
00417 socklen_t len;
00418 UnixSocketAddress_t addr;
00419 memset(&addr, 0, sizeof(addr));
00420 len = sizeof(addr);
00421 if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00422 {
00423 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::fillUnixAddrParms: getsockname");
00424 }
00425 m_localAddress.assignFromNativeForm(&addr, len);
00426 m_peerAddress.assignFromNativeForm(&addr, len);
00427 }
00428 static Mutex guard;
00430 int
00431 SocketBaseImpl::write(const void* dataOut, int dataOutLen, bool errorAsException)
00432 {
00433 int rc = 0;
00434 bool isError = false;
00435 if (m_isConnected)
00436 {
00437 isError = waitForOutput(m_sendTimeout);
00438 if (isError)
00439 {
00440 rc = -1;
00441 }
00442 else
00443 {
00444 rc = writeAux(dataOut, dataOutLen);
00445 if (!m_traceFileOut.empty() && rc > 0)
00446 {
00447 MutexLock ml(guard);
00448 ofstream traceFile(m_traceFileOut.c_str(), std::ios::app);
00449 if (!traceFile)
00450 {
00451 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00452 }
00453 if (!traceFile.write(static_cast<const char*>(dataOut), rc))
00454 {
00455 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00456 }
00457
00458 ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00459 if (!comboTraceFile)
00460 {
00461 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00462 }
00463 DateTime curDateTime;
00464 curDateTime.setToCurrent();
00465 comboTraceFile << "\n--->Out " << rc << " bytes at " << curDateTime.toString("%X") <<
00466 '.' << curDateTime.getMicrosecond() << "<---\n";
00467 if (!comboTraceFile.write(static_cast<const char*>(dataOut), rc))
00468 {
00469 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00470 }
00471 }
00472 }
00473 }
00474 else
00475 {
00476 rc = -1;
00477 }
00478 if (rc < 0 && errorAsException)
00479 {
00480 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::write");
00481 }
00482 return rc;
00483 }
00485 int
00486 SocketBaseImpl::read(void* dataIn, int dataInLen, bool errorAsException)
00487 {
00488 int rc = 0;
00489 bool isError = false;
00490 if (m_isConnected)
00491 {
00492 isError = waitForInput(m_recvTimeout);
00493 if (isError)
00494 {
00495 rc = -1;
00496 }
00497 else
00498 {
00499 rc = readAux(dataIn, dataInLen);
00500 if (!m_traceFileIn.empty() && rc > 0)
00501 {
00502 MutexLock ml(guard);
00503 ofstream traceFile(m_traceFileIn.c_str(), std::ios::app);
00504 if (!traceFile)
00505 {
00506 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed opening tracefile");
00507 }
00508 if (!traceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00509 {
00510 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00511 }
00512
00513 ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00514 if (!comboTraceFile)
00515 {
00516 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00517 }
00518 DateTime curDateTime;
00519 curDateTime.setToCurrent();
00520 comboTraceFile << "\n--->In " << rc << " bytes at " << curDateTime.toString("%X") <<
00521 '.' << curDateTime.getMicrosecond() << "<---\n";
00522 if (!comboTraceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00523 {
00524 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00525 }
00526 }
00527 }
00528 }
00529 else
00530 {
00531 rc = -1;
00532 }
00533 if (rc < 0)
00534 {
00535 if (errorAsException)
00536 {
00537 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::read");
00538 }
00539 }
00540 return rc;
00541 }
00543 bool
00544 SocketBaseImpl::waitForInput(int timeOutSecs)
00545 {
00546 int rval = SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_INPUT);
00547 if (rval == ETIMEDOUT)
00548 {
00549 m_recvTimeoutExprd = true;
00550 }
00551 else
00552 {
00553 m_recvTimeoutExprd = false;
00554 }
00555 return (rval != 0);
00556 }
00558 bool
00559 SocketBaseImpl::waitForOutput(int timeOutSecs)
00560 {
00561 return SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_OUTPUT) != 0;
00562 }
00564 istream&
00565 SocketBaseImpl::getInputStream()
00566 {
00567 return m_in;
00568 }
00570 ostream&
00571 SocketBaseImpl::getOutputStream()
00572 {
00573 return m_out;
00574 }
00576 iostream&
00577 SocketBaseImpl::getIOStream()
00578 {
00579 return m_inout;
00580 }
00582
00583 void
00584 SocketBaseImpl::setDumpFiles(const String& in, const String& out)
00585 {
00586 m_traceFileOut = out;
00587 m_traceFileIn = in;
00588 }
00589
00590 }
00591
00592 #endif // #if !defined(BLOCXX_WIN32)
00593