00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00037 #include "blocxx/BLOCXX_config.h"
00038 #include "blocxx/PosixUnnamedPipe.hpp"
00039 #include "blocxx/AutoPtr.hpp"
00040 #include "blocxx/IOException.hpp"
00041 #include "blocxx/Format.hpp"
00042 #include "blocxx/SocketUtils.hpp"
00043 #include "blocxx/Assertion.hpp"
00044
00045 extern "C"
00046 {
00047 #ifdef BLOCXX_WIN32
00048 #define _CLOSE ::_close
00049 #define _WRITE ::_write
00050 #define _READ ::_read
00051 #define _OPEN ::_open
00052 #include <io.h>
00053 #else
00054 #ifdef BLOCXX_HAVE_UNISTD_H
00055 #include <unistd.h>
00056 #endif
00057 #define _CLOSE ::close
00058 #define _WRITE ::write
00059 #define _READ ::read
00060 #define _OPEN ::open
00061 #endif
00062
00063 #include <fcntl.h>
00064 #include <errno.h>
00065 }
00066 #include <cstring>
00067
00068 namespace BLOCXX_NAMESPACE
00069 {
00070
00071 #ifdef BLOCXX_NETWARE
00072 namespace
00073 {
00074 class AcceptThread
00075 {
00076 public:
00077 AcceptThread(int serversock)
00078 : m_serversock(serversock)
00079 , m_serverconn(-1)
00080 {
00081 }
00082
00083 void acceptConnection();
00084 int getConnectFD() { return m_serverconn; }
00085 private:
00086 int m_serversock;
00087 int m_serverconn;
00088 };
00089
00090 void
00091 AcceptThread::acceptConnection()
00092 {
00093 struct sockaddr_in sin;
00094 size_t val;
00095 int tmp = 1;
00096
00097 tmp = 1;
00098 ::setsockopt(m_serversock, IPPROTO_TCP, 1,
00099 (char*) &tmp, sizeof(int));
00100
00101 val = sizeof(struct sockaddr_in);
00102 if ((m_serverconn = ::accept(m_serversock, (struct sockaddr*)&sin, &val))
00103 == -1)
00104 {
00105 return;
00106 }
00107 tmp = 1;
00108 ::setsockopt(m_serverconn, IPPROTO_TCP, 1,
00109 (char *) &tmp, sizeof(int));
00110 tmp = 0;
00111 ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
00112 (char*) &tmp, sizeof(int));
00113 }
00114
00115 void*
00116 runConnClass(void* arg)
00117 {
00118 AcceptThread* acceptThread = (AcceptThread*)(arg);
00119 acceptThread->acceptConnection();
00120 ::pthread_exit(NULL);
00121 return 0;
00122 }
00123
00124 int
00125 _pipe(int *fds)
00126 {
00127 int svrfd, lerrno, connectfd;
00128 size_t val;
00129 struct sockaddr_in sin;
00130
00131 svrfd = socket( AF_INET, SOCK_STREAM, 0 );
00132 sin.sin_family = AF_INET;
00133 sin.sin_addr.s_addr = htonl( 0x7f000001 );
00134 sin.sin_port = 0;
00135 memset(sin.sin_zero, 0, 8 );
00136 if (bind(svrfd, (struct sockaddr * )&sin, sizeof( struct sockaddr_in ) ) == -1)
00137 {
00138 int lerrno = errno;
00139 ::close(svrfd);
00140 fprintf(stderr, "CreateSocket(): Failed to bind on socket" );
00141 return -1;
00142 }
00143 if (listen(svrfd, 1) == -1)
00144 {
00145 int lerrno = errno;
00146 ::close(svrfd);
00147 return -1;
00148 }
00149 val = sizeof(struct sockaddr_in);
00150 if (getsockname(svrfd, ( struct sockaddr * )&sin, &val ) == -1)
00151 {
00152 int lerrno = errno;
00153 fprintf(stderr, "CreateSocket(): Failed to obtain socket name" );
00154 ::close(svrfd);
00155 return -1;
00156 }
00157
00158 AcceptThread* pat = new AcceptThread(svrfd);
00159 pthread_t athread;
00160
00161
00162 pthread_create(&athread, NULL, runConnClass, pat);
00163
00164 int clientfd = socket(AF_INET, SOCK_STREAM, 0);
00165 if (clientfd == -1)
00166 {
00167 delete pat;
00168 return -1;
00169 }
00170
00171
00172 struct sockaddr_in csin;
00173 csin.sin_family = AF_INET;
00174 csin.sin_addr.s_addr = htonl(0x7f000001);
00175 csin.sin_port = sin.sin_port;
00176 if (::connect(clientfd, (struct sockaddr*)&csin, sizeof(csin)) == -1)
00177 {
00178 delete pat;
00179 return -1;
00180 }
00181
00182 #define TCP_NODELAY 1
00183 int tmp = 1;
00184
00185
00186
00187 ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (char*)&tmp, sizeof(int));
00188 tmp = 0;
00189 ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (char*)&tmp, sizeof(int));
00190
00191 void* threadResult;
00192
00193 ::pthread_join(athread, &threadResult);
00194
00195 ::close(svrfd);
00196 fds[0] = pat->getConnectFD();
00197 fds[1] = clientfd;
00198 delete pat;
00199 return 0;
00200 }
00201 }
00202 #endif // BLOCXX_NETWARE
00203
00205
00206 UnnamedPipeRef
00207 UnnamedPipe::createUnnamedPipe(EOpen doOpen)
00208 {
00209 return UnnamedPipeRef(new PosixUnnamedPipe(doOpen));
00210 }
00212 PosixUnnamedPipe::PosixUnnamedPipe(EOpen doOpen)
00213 #ifndef BLOCXX_WIN32
00214 : m_blocking(E_BLOCKING)
00215 #endif
00216 {
00217 #ifdef BLOCXX_WIN32
00218 m_blocking[0] = m_blocking[1] = E_BLOCKING;
00219 #endif
00220 m_fds[0] = m_fds[1] = -1;
00221 if (doOpen)
00222 {
00223 open();
00224 }
00225 setTimeouts(60 * 10);
00226 setBlocking(E_BLOCKING);
00227 }
00228
00230 PosixUnnamedPipe::~PosixUnnamedPipe()
00231 {
00232 close();
00233 }
00235 void
00236 PosixUnnamedPipe::setBlocking(EBlockingMode outputIsBlocking)
00237 {
00238 #ifdef BLOCXX_WIN32
00239
00240 BLOCXX_ASSERT(m_fds[0] != -1 && m_fds[1] != -1);
00241
00242 m_blocking[0] = outputIsBlocking;
00243 m_blocking[1] = outputIsBlocking;
00244
00245
00246
00247
00248
00249
00250 return;
00251 #else
00252
00253 BLOCXX_ASSERT(m_fds[0] != -1 && m_fds[1] != -1);
00254
00255 m_blocking = outputIsBlocking;
00256
00257 for (size_t i = 0; i <= 1; ++i)
00258 {
00259 int fdflags = fcntl(m_fds[i], F_GETFL, 0);
00260 if (fdflags == -1)
00261 {
00262 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00263 }
00264 if (outputIsBlocking == E_BLOCKING)
00265 {
00266 fdflags &= !O_NONBLOCK;
00267 }
00268 else
00269 {
00270 fdflags |= O_NONBLOCK;
00271 }
00272 if (fcntl(m_fds[i], F_SETFL, fdflags) == -1)
00273 {
00274 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00275 }
00276 }
00277
00278 #endif
00279 }
00281 void
00282 PosixUnnamedPipe::open()
00283 {
00284 if (m_fds[0] != -1)
00285 {
00286 close();
00287 }
00288 #if defined(BLOCXX_WIN32)
00289 HANDLE pipe = CreateNamedPipe( "\\\\.\\pipe\\TestPipe",
00290 PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED,
00291 PIPE_TYPE_MESSAGE,
00292 PIPE_UNLIMITED_INSTANCES,
00293 2560,
00294 2560,
00295 NMPWAIT_USE_DEFAULT_WAIT,
00296 NULL );
00297
00298 HANDLE client = CreateFile( "\\\\.\\pipe\\TestPipe",
00299 GENERIC_READ,
00300 FILE_SHARE_READ,
00301 NULL,
00302 OPEN_EXISTING,
00303 FILE_FLAG_OVERLAPPED,
00304 NULL );
00305
00306 HANDLE event1 = CreateEvent(NULL, TRUE, FALSE, NULL);
00307 HANDLE event2 = CreateEvent(NULL, TRUE, FALSE, NULL);
00308
00309
00310 BOOL bConnected = ConnectNamedPipe( pipe, NULL );
00311 if( !bConnected && GetLastError() == ERROR_PIPE_CONNECTED )
00312 bConnected = TRUE;
00313
00314 BOOL bSuccess =
00315 pipe != INVALID_HANDLE_VALUE &&
00316 client != INVALID_HANDLE_VALUE &&
00317 event1 != INVALID_HANDLE_VALUE &&
00318 event2 != INVALID_HANDLE_VALUE &&
00319 bConnected;
00320
00321 if( !bSuccess )
00322 {
00323 CloseHandle(pipe);
00324 CloseHandle(client);
00325 CloseHandle(event1);
00326 CloseHandle(event2);
00327 }
00328 else
00329 {
00330 m_fds[0] = (int)client;
00331 m_fds[1] = (int)pipe;
00332 m_events[0] = (int)event1;
00333 m_events[1] = (int)event2;
00334 }
00335
00336 if( !bSuccess )
00337
00338 #elif defined(BLOCXX_NETWARE)
00339 if (_pipe(m_fds) == -1)
00340 #else
00341 if (::pipe(m_fds) == -1)
00342 #endif
00343 {
00344 m_fds[0] = m_fds[1] = -1;
00345 BLOCXX_THROW(UnnamedPipeException, ::strerror(errno));
00346 }
00347 }
00349 int
00350 PosixUnnamedPipe::close()
00351 {
00352 int rc = -1;
00353 if (m_fds[0] != -1)
00354 {
00355 #ifdef BLOCXX_WIN32
00356 HANDLE h = (HANDLE)m_fds[0];
00357 HANDLE e = (HANDLE)m_events[0];
00358 if( CloseHandle(h) && CloseHandle(e) )
00359 rc = 0;
00360 #else
00361 rc = _CLOSE(m_fds[0]);
00362 #endif
00363 m_fds[0] = -1;
00364 }
00365 if (m_fds[1] != -1)
00366 {
00367 #ifdef BLOCXX_WIN32
00368 HANDLE h = (HANDLE)m_fds[1];
00369 HANDLE e = (HANDLE)m_events[1];
00370 if( CloseHandle(h) && CloseHandle(e) )
00371 rc = 0;
00372 #else
00373 rc = _CLOSE(m_fds[1]);
00374 #endif
00375 m_fds[1] = -1;
00376 }
00377 return rc;
00378 }
00380 bool
00381 PosixUnnamedPipe::isOpen() const
00382 {
00383 return (m_fds[0] != -1) || (m_fds[1] != -1);
00384 }
00385
00387 int
00388 PosixUnnamedPipe::closeInputHandle()
00389 {
00390 int rc = -1;
00391 if (m_fds[0] != -1)
00392 {
00393 #ifdef BLOCXX_WIN32
00394 HANDLE h = (HANDLE)m_fds[0];
00395 HANDLE e = (HANDLE)m_events[0];
00396 if( CloseHandle(h) && CloseHandle(e) )
00397 rc = 0;
00398 #else
00399 rc = _CLOSE(m_fds[0]);
00400 #endif
00401 m_fds[0] = -1;
00402 }
00403 return rc;
00404 }
00406 int
00407 PosixUnnamedPipe::closeOutputHandle()
00408 {
00409 int rc = -1;
00410 if (m_fds[1] != -1)
00411 {
00412 #ifdef BLOCXX_WIN32
00413 HANDLE h = (HANDLE)m_fds[1];
00414 HANDLE e = (HANDLE)m_events[1];
00415 if( CloseHandle(h) && CloseHandle(e) )
00416 rc = 0;
00417 #else
00418 rc = _CLOSE(m_fds[1]);
00419 #endif
00420 m_fds[1] = -1;
00421 }
00422 return rc;
00423 }
00425 int
00426 PosixUnnamedPipe::write(const void* data, int dataLen, bool errorAsException)
00427 {
00428 int rc = -1;
00429 if (m_fds[1] != -1)
00430 {
00431 #ifndef BLOCXX_WIN32
00432 if (m_blocking == E_BLOCKING)
00433 {
00434 rc = SocketUtils::waitForIO(m_fds[1], m_writeTimeout, SocketFlags::E_WAIT_FOR_OUTPUT);
00435 if (rc != 0)
00436 {
00437 if (errorAsException)
00438 {
00439 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00440 }
00441 else
00442 {
00443 return rc;
00444 }
00445 }
00446 }
00447 rc = _WRITE(m_fds[1], data, dataLen);
00448 #else
00449 BOOL bSuccess = FALSE;
00450
00451 OVERLAPPED ovl;
00452
00453 ovl.hEvent = (HANDLE)m_events[1];
00454 ovl.Offset = 0;
00455 ovl.OffsetHigh = 0;
00456
00457 bSuccess = WriteFile(
00458 (HANDLE)m_fds[1],
00459 data,
00460 dataLen,
00461 NULL,
00462 &ovl);
00463
00464 if( bSuccess && m_blocking[1] == E_BLOCKING )
00465 {
00466 bSuccess = WaitForSingleObject( (HANDLE)m_events[1], INFINITE ) == WAIT_OBJECT_0;
00467 }
00468
00469 if( bSuccess )
00470 rc = 0;
00471 #endif
00472 }
00473 if (errorAsException && rc == -1)
00474 {
00475 BLOCXX_THROW_ERRNO_MSG(IOException, "pipe write failed.");
00476 }
00477 return rc;
00478 }
00480 int
00481 PosixUnnamedPipe::read(void* buffer, int bufferLen, bool errorAsException)
00482 {
00483 int rc = -1;
00484 if (m_fds[0] != -1)
00485 {
00486 #ifndef BLOCXX_WIN32
00487 if (m_blocking == E_BLOCKING)
00488 {
00489 rc = SocketUtils::waitForIO(m_fds[0], m_readTimeout, SocketFlags::E_WAIT_FOR_INPUT);
00490 if (rc != 0)
00491 {
00492 if (errorAsException)
00493 {
00494 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00495 }
00496 else
00497 {
00498 return rc;
00499 }
00500 }
00501 }
00502 rc = _READ(m_fds[0], buffer, bufferLen);
00503 #else
00504 BOOL bSuccess = FALSE;
00505
00506 OVERLAPPED ovl;
00507
00508 ovl.hEvent = (HANDLE)m_events[0];
00509 ovl.Offset = 0;
00510 ovl.OffsetHigh = 0;
00511
00512 bSuccess = ReadFile(
00513 (HANDLE)m_fds[0],
00514 buffer,
00515 bufferLen,
00516 NULL,
00517 &ovl);
00518
00519 if( bSuccess && m_blocking[0] == E_BLOCKING )
00520 {
00521 bSuccess = WaitForSingleObject( (HANDLE)m_events[0], INFINITE ) == WAIT_OBJECT_0;
00522 }
00523
00524 if( bSuccess )
00525 rc = 0;
00526 #endif
00527 }
00528 if (errorAsException && rc == -1)
00529 {
00530 BLOCXX_THROW_ERRNO_MSG(IOException, "pipe read failed.");
00531 }
00532 return rc;
00533 }
00535 Select_t
00536 PosixUnnamedPipe::getSelectObj() const
00537 {
00538 #ifdef BLOCXX_WIN32
00539 Select_t selectObj;
00540 selectObj.event = (HANDLE)m_events[0];
00541 selectObj.sockfd = INVALID_SOCKET;
00542 selectObj.networkevents = 0;
00543 selectObj.doreset = false;
00544
00545 return selectObj;
00546 #else
00547 return m_fds[0];
00548 #endif
00549 }
00550
00552 Select_t
00553 PosixUnnamedPipe::getWriteSelectObj() const
00554 {
00555 #ifdef BLOCXX_WIN32
00556 Select_t selectObj;
00557 selectObj.event = (HANDLE)m_events[1];
00558 selectObj.sockfd = INVALID_SOCKET;
00559 selectObj.networkevents = 0;
00560 selectObj.doreset = false;
00561
00562 return selectObj;
00563 #else
00564 return m_fds[1];
00565 #endif
00566 }
00567
00568 }
00569