ThreadImpl.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2004 Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2005 Novell, Inc. All rights reserved.
00004 *
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 *
00008 *  - Redistributions of source code must retain the above copyright notice,
00009 *    this list of conditions and the following disclaimer.
00010 *
00011 *  - Redistributions in binary form must reproduce the above copyright notice,
00012 *    this list of conditions and the following disclaimer in the documentation
00013 *    and/or other materials provided with the distribution.
00014 *
00015 *  - Neither the name of Vintela, Inc., Novell, Inc., nor the names of its
00016 *    contributors may be used to endorse or promote products derived from this
00017 *    software without specific prior written permission.
00018 *
00019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00020 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00021 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00022 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc., Novell, Inc., OR THE 
00023 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
00024 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
00025 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 
00026 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
00027 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
00028 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
00029 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 *******************************************************************************/
00031 
00032 
00037 #include "blocxx/BLOCXX_config.h"
00038 #include "blocxx/ThreadImpl.hpp"
00039 #include "blocxx/Mutex.hpp"
00040 #include "blocxx/Assertion.hpp"
00041 #include "blocxx/Thread.hpp"
00042 #include "blocxx/NonRecursiveMutexLock.hpp"
00043 #include "blocxx/Format.hpp"
00044 #if defined(BLOCXX_WIN32)
00045 #include "blocxx/Map.hpp"
00046 #include "blocxx/MutexLock.hpp"
00047 #endif
00048 #include <cassert>
00049 #include <cstring>
00050 #include <cstddef>
00051 
00052 extern "C"
00053 {
00054 #ifdef BLOCXX_HAVE_SYS_TIME_H
00055 #include <sys/time.h>
00056 #endif
00057 
00058 #include <sys/types.h>
00059 
00060 #ifdef BLOCXX_HAVE_UNISTD_H
00061 #include <unistd.h>
00062 #endif
00063 
00064 #include <errno.h>
00065 #include <signal.h>
00066 
00067 #ifdef BLOCXX_USE_PTHREAD
00068 #include <pthread.h>
00069 #endif
00070 
00071 #ifdef BLOCXX_WIN32
00072 #include <process.h>
00073 #endif
00074 }
00075 
00076 namespace BLOCXX_NAMESPACE
00077 {
00078 
00079 namespace ThreadImpl {
00080 
00082 // STATIC
00083 void
00084 sleep(UInt32 milliSeconds)
00085 {
00086    ThreadImpl::testCancel();
00087 #if defined(BLOCXX_HAVE_NANOSLEEP)
00088    struct timespec wait;
00089    wait.tv_sec = milliSeconds / 1000;
00090    wait.tv_nsec = (milliSeconds % 1000) * 1000000;
00091    while (nanosleep(&wait, &wait) == -1 && errno == EINTR)
00092    {
00093       ThreadImpl::testCancel();
00094    }
00095 #elif BLOCXX_WIN32
00096    Sleep(milliSeconds);
00097 #else
00098    timeval now, end;
00099    unsigned long microSeconds = milliSeconds * 1000;
00100    const UInt32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
00101    gettimeofday(&now, NULL);
00102    end = now;
00103    end.tv_sec  += microSeconds / 1000000;
00104    end.tv_usec += microSeconds % 1000000;
00105    while ((now.tv_sec < end.tv_sec)
00106        || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec)))
00107    {
00108       timeval tv;
00109       tv.tv_sec = end.tv_sec - now.tv_sec;
00110       if (end.tv_usec >= now.tv_usec)
00111       {
00112          tv.tv_usec = end.tv_usec - now.tv_usec;
00113       }
00114       else
00115       {
00116          tv.tv_sec--;
00117          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00118       }
00119       if (tv.tv_sec > 0 || tv.tv_usec > loopMicroSeconds)
00120       {
00121          tv.tv_sec = 0;
00122          tv.tv_usec = loopMicroSeconds;
00123       }
00124       ThreadImpl::testCancel();
00125       select(0, NULL, NULL, NULL, &tv);
00126       gettimeofday(&now, NULL);
00127    }
00128 #endif
00129 }
00131 // STATIC
00132 void
00133 yield()
00134 {
00135 #if defined(BLOCXX_HAVE_SCHED_YIELD)
00136    sched_yield();
00137 #elif defined(BLOCXX_WIN32)
00138    ThreadImpl::testCancel();
00139    ::SwitchToThread();
00140 #else
00141    ThreadImpl::sleep(1);
00142 #endif
00143 }
00144 
00145 #if defined(BLOCXX_USE_PTHREAD)
00146 namespace {
00147 struct LocalThreadParm
00148 {
00149    ThreadFunction m_func;
00150    void* m_funcParm;
00151 };
00152 extern "C" {
00153 static void*
00154 threadStarter(void* arg)
00155 {
00156    // set our cancellation state to asynchronous, so we can actually be
00157    // killed if need be.
00158    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00159    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00160 
00161    // block all signals except SIGUSR1, which is used to signal termination
00162    sigset_t signalSet;
00163    int rv = sigfillset(&signalSet);
00164    BLOCXX_ASSERT(rv == 0);
00165    rv = sigdelset(&signalSet, SIGUSR1);
00166    BLOCXX_ASSERT(rv == 0);
00167    rv = pthread_sigmask(SIG_SETMASK, &signalSet, 0);
00168    BLOCXX_ASSERT(rv == 0);
00169 
00170    LocalThreadParm* parg = static_cast<LocalThreadParm*>(arg);
00171    ThreadFunction func = parg->m_func;
00172    void* funcParm = parg->m_funcParm;
00173    delete parg;
00174    Int32 rval = (*func)(funcParm);
00175    void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00176    pthread_exit(prval);
00177    return prval;
00178 }
00179 }
00180 // The purpose of this class is to retrieve the default stack size only once
00181 // at library load time and re-use it thereafter.
00182 struct default_stack_size
00183 {
00184    default_stack_size()
00185    {
00186       // if anything in this function fails, we'll just leave val == 0.
00187       val = 0;
00188       needsSetting = false;
00189 
00190 // make sure we have a big enough stack.  blocxx can use quite a bit, so we'll try to make sure we get at least 1 MB.
00191 // 1 MB is just an arbitrary number.  The default on Linux is 2 MB which has never been a problem.  However, on UnixWare
00192 // the default is really low (16K IIRC) and that isn't enough. It would be good to do some sort of measurement...
00193 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
00194       pthread_attr_t stack_size_attr;
00195       if (pthread_attr_init(&stack_size_attr) != 0)
00196       {
00197          return;
00198       }
00199       if (pthread_attr_getstacksize(&stack_size_attr, &val) != 0)
00200       {
00201          return;
00202       }
00203 
00204       if (val < 1048576) 
00205       {
00206          val = 1048576; // 1 MB
00207          needsSetting = true;
00208       }
00209 #ifdef PTHREAD_STACK_MIN
00210       if (PTHREAD_STACK_MIN > val) 
00211       {
00212          val = PTHREAD_STACK_MIN;
00213          needsSetting = true;
00214       }
00215 #endif
00216 #endif
00217    }
00218    static size_t val;
00219    static bool needsSetting;
00220 };
00221 size_t default_stack_size::val = 0;
00222 bool default_stack_size::needsSetting(false);
00223 default_stack_size g_theDefaultStackSize;
00225 pthread_once_t once_control = PTHREAD_ONCE_INIT;
00226 pthread_key_t theKey;
00227 extern "C" {
00229 static void initializeTheKey()
00230 {
00231    pthread_key_create(&theKey,NULL);
00232    // set SIGUSR1 to SIG_IGN so we can safely send it to threads when we want to cancel them.
00233    struct sigaction temp;
00234    memset(&temp, '\0', sizeof(temp));
00235    sigaction(SIGUSR1, 0, &temp);
00236    if (temp.sa_handler != SIG_IGN)
00237    {
00238       temp.sa_handler = SIG_IGN;
00239       sigemptyset(&temp.sa_mask);
00240       temp.sa_flags = 0;
00241       sigaction(SIGUSR1, &temp, NULL);
00242    }
00243 }
00244 } // end extern "C"
00245 } // end unnamed namespace
00247 // STATIC
00248 int
00249 createThread(Thread_t& handle, ThreadFunction func,
00250    void* funcParm, UInt32 threadFlags)
00251 {
00252    int cc = 0;
00253    pthread_attr_t attr;
00254    pthread_attr_init(&attr);
00255    if (!(threadFlags & BLOCXX_THREAD_FLG_JOINABLE))
00256    {
00257       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00258    }
00259 
00260 #if !defined(BLOCXX_VALGRIND_SUPPORT) // valgrind doesn't like us to set the stack size
00261    // Won't be set to true unless _POSIX_THREAD_ATTR_STACKSIZE is defined
00262    if (default_stack_size::needsSetting)
00263    {
00264       pthread_attr_setstacksize(&attr, default_stack_size::val);
00265    }
00266 #endif
00267 
00268    LocalThreadParm* parg = new LocalThreadParm;
00269    parg->m_func = func;
00270    parg->m_funcParm = funcParm;
00271    if (pthread_create(&handle, &attr, threadStarter, parg) != 0)
00272    {
00273       cc = -1;
00274    }
00275    pthread_attr_destroy(&attr);
00276    return cc;
00277 }
00279 // STATIC
00280 void
00281 exitThread(Thread_t&, Int32 rval)
00282 {
00283    void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00284    pthread_exit(prval);
00285 }
00286 
00287 
00288 #if defined(BLOCXX_SIZEOF_PTHREAD_T)
00289 #if BLOCXX_SIZEOF_PTHREAD_T == 2
00290 #define BLOCXX_THREAD_CONVERTER UInt16
00291 #elif BLOCXX_SIZEOF_PTHREAD_T == 4
00292 #define BLOCXX_THREAD_CONVERTER UInt32
00293 #elif BLOCXX_SIZEOF_PTHREAD_T == 8
00294 #define BLOCXX_THREAD_CONVERTER UInt64
00295 #else
00296 #error Unexpected size for pthread_t
00297 #endif /* BLOCXX_SIZEOF_PTHREAD_T */
00298 #else
00299 #error No pthread_t size was found!
00300 #endif /* defined(BLOCXX_SIZEOF_PTHREAD_T) */
00301 
00302 UInt64 thread_t_ToUInt64(Thread_t thr)
00303 {
00304    return UInt64(BLOCXX_THREAD_CONVERTER(thr));
00305 }
00306 #undef BLOCXX_THREAD_CONVERTER
00307 
00309 // STATIC
00310 void
00311 destroyThread(Thread_t& )
00312 {
00313 }
00315 // STATIC
00316 int
00317 setThreadDetached(Thread_t& handle)
00318 {
00319    int cc = pthread_detach(handle);
00320    if (cc != 0)
00321    {
00322       if (cc != EINVAL)
00323       {
00324          cc = -1;
00325       }
00326    }
00327    return cc;
00328 }
00330 // STATIC
00331 int
00332 joinThread(Thread_t& handle, Int32& rval)
00333 {
00334    void* prval(0);
00335    if ((errno = pthread_join(handle, &prval)) == 0)
00336    {
00337       rval = static_cast<Int32>(reinterpret_cast<ptrdiff_t>(prval));
00338       return 0;
00339    }
00340    else
00341    {
00342       return 1;
00343    }
00344 }
00346 void
00347 testCancel()
00348 {
00349    // set up our TLS which will be used to store the Thread* in.
00350    pthread_once(&once_control, &initializeTheKey);
00351    Thread* theThread = reinterpret_cast<Thread*>(pthread_getspecific(theKey));
00352    if (theThread == 0)
00353    {
00354       return;
00355    }
00356    NonRecursiveMutexLock l(theThread->m_cancelLock);
00357    if (theThread->m_cancelRequested)
00358    {
00359       // We don't use BLOCXX_THROW here because 
00360       // ThreadCancelledException is special.  It's not derived
00361       // from Exception on purpose so it can be propagated up
00362       // the stack easier. This exception shouldn't be caught and not
00363       // re-thrown anywhere except in Thread::threadRunner()
00364       throw ThreadCancelledException();
00365    }
00366 }
00368 void saveThreadInTLS(void* pTheThread)
00369 {
00370    // set up our TLS which will be used to store the Thread* in.
00371    pthread_once(&once_control, &initializeTheKey);
00372    int rc;
00373    if ((rc = pthread_setspecific(theKey, pTheThread)) != 0)
00374    {
00375       BLOCXX_THROW(ThreadException, Format("pthread_setspecific failed.  error = %1(%2)", rc, strerror(rc)).c_str());
00376    }
00377 }
00379 void sendSignalToThread(Thread_t threadID, int signo)
00380 {
00381    int rc;
00382    if ((rc = pthread_kill(threadID, signo)) != 0)
00383    {
00384       BLOCXX_THROW(ThreadException, Format("pthread_kill failed.  error = %1(%2)", rc, strerror(rc)).c_str());
00385    }
00386 }
00388 void cancel(Thread_t threadID)
00389 {
00390    int rc;
00391    if ((rc = pthread_cancel(threadID)) != 0)
00392    {
00393       BLOCXX_THROW(ThreadException, Format("pthread_cancel failed.  error = %1(%2)", rc, strerror(rc)).c_str());
00394    }
00395 }
00396 #endif // #ifdef BLOCXX_USE_PTHREAD
00397 
00398 #if defined(BLOCXX_WIN32)
00399 
00400 namespace {
00401 
00402 struct WThreadInfo
00403 {
00404    HANDLE   handle;
00405    BLOCXX_NAMESPACE::Thread* pTheThread;
00406 };
00407 
00408 typedef Map<DWORD, WThreadInfo> Win32ThreadMap;
00409 Win32ThreadMap g_threads;
00410 Mutex g_threadsGuard;
00411 
00412 struct LocalThreadParm
00413 {
00414    ThreadFunction m_func;
00415    void* m_funcParm;
00416 };
00417 
00419 extern "C" {
00420 unsigned __stdcall threadStarter(void* arg)
00421 {
00422    LocalThreadParm* parg = reinterpret_cast<LocalThreadParm*>(arg);
00423    ThreadFunction func = parg->m_func;
00424    void* funcParm = parg->m_funcParm;
00425    delete parg;
00426    Int32 rval = (*func)(funcParm);
00427    ::_endthreadex(static_cast<unsigned>(rval));
00428    return rval;
00429 }
00430 }  // End extern "C"
00431 
00433 void
00434 addThreadToMap(DWORD threadId, HANDLE threadHandle)
00435 {
00436    MutexLock ml(g_threadsGuard);
00437    WThreadInfo wi;
00438    wi.handle = threadHandle;
00439    wi.pTheThread = 0;
00440    g_threads[threadId] = wi;
00441 }
00442 
00444 HANDLE
00445 getThreadHandle(DWORD threadId)
00446 {
00447    MutexLock ml(g_threadsGuard);
00448    HANDLE chdl = 0;
00449    Win32ThreadMap::iterator it = g_threads.find(threadId);
00450    if (it != g_threads.end())
00451    {
00452       chdl = it->second.handle;
00453    }
00454    return chdl;
00455 }
00456 
00458 void
00459 setThreadPointer(DWORD threadId, Thread* pTheThread)
00460 {
00461    MutexLock ml(g_threadsGuard);
00462    Win32ThreadMap::iterator it = g_threads.find(threadId);
00463    if (it != g_threads.end())
00464    {
00465       it->second.pTheThread = pTheThread;
00466    }
00467 }
00468 
00470 HANDLE
00471 removeThreadFromMap(DWORD threadId)
00472 {
00473    MutexLock ml(g_threadsGuard);
00474    HANDLE chdl = 0;
00475    Win32ThreadMap::iterator it = g_threads.find(threadId);
00476    if (it != g_threads.end())
00477    {
00478       chdl = it->second.handle;
00479       g_threads.erase(it);
00480    }
00481    return chdl;
00482 }
00483 
00485 Thread*
00486 getThreadObject(DWORD threadId)
00487 {
00488    Thread* pTheThread = 0;
00489    MutexLock ml(g_threadsGuard);
00490    Win32ThreadMap::iterator it = g_threads.find(threadId);
00491    if (it != g_threads.end())
00492    {
00493       pTheThread = it->second.pTheThread;
00494    }
00495    return pTheThread;
00496 }
00497 
00498 }  // End unnamed namespace
00499 
00501 // STATIC
00502 int
00503 createThread(Thread_t& handle, ThreadFunction func,
00504    void* funcParm, UInt32 threadFlags)
00505 {
00506    int cc = -1;
00507    HANDLE hThread;
00508    unsigned threadId;
00509 
00510    LocalThreadParm* parg = new LocalThreadParm;
00511    parg->m_func = func;
00512    parg->m_funcParm = funcParm;
00513    hThread = reinterpret_cast<HANDLE>(::_beginthreadex(NULL, 0, threadStarter,
00514       parg, 0, &threadId));
00515    if (hThread != 0)
00516    {
00517       addThreadToMap(threadId, hThread);
00518       handle = threadId;
00519       cc = 0;
00520    }
00521 
00522    return cc;
00523 }
00525 // STATIC
00526 void
00527 exitThread(Thread_t&, Int32 rval)
00528 {
00529    ::_endthreadex(static_cast<unsigned>(rval));
00530 }
00531 
00533 // STATIC
00534 UInt64 thread_t_ToUInt64(Thread_t thr)
00535 {
00536    //  This should really be a compile time assert.
00537    BLOCXX_ASSERTMSG(sizeof(unsigned long) >= sizeof(Thread_t),"  Thread_t truncated!");
00538    return static_cast<UInt64>(thr);
00539 }
00540 
00542 // STATIC
00543 void
00544 destroyThread(Thread_t& threadId)
00545 {
00546    HANDLE thdl = removeThreadFromMap(threadId);
00547    if (thdl != 0)
00548    {
00549       ::CloseHandle(thdl);
00550    }
00551 }
00553 // STATIC
00554 int
00555 setThreadDetached(Thread_t& handle)
00556 {
00557    // No need for this on Win32
00558    return 0;
00559 }
00561 // STATIC
00562 int
00563 joinThread(Thread_t& threadId, Int32& rvalArg)
00564 {
00565    int cc = -1;
00566    DWORD rval;
00567    HANDLE thdl = getThreadHandle(threadId);
00568    if (thdl != 0)
00569    {
00570       if (::WaitForSingleObject(thdl, INFINITE) != WAIT_FAILED)
00571       {
00572          if (::GetExitCodeThread(thdl, &rval) != 0)
00573          {
00574             rvalArg = static_cast<Int32>(rval);
00575             cc = 0;
00576          }
00577       }
00578    }
00579    return cc;
00580 }
00581 
00583 void
00584 testCancel()
00585 {
00586    DWORD threadId = ThreadImpl::currentThread();
00587    Thread* pTheThread = getThreadObject(threadId);
00588    if (pTheThread)
00589    {
00590       NonRecursiveMutexLock l(pTheThread->m_cancelLock);
00591       if (pTheThread->m_cancelRequested)
00592       {
00593          // We don't use BLOCXX_THROW here because 
00594          // ThreadCancelledException is special.  It's not derived
00595          // from Exception on purpose so it can be propagated up
00596          // the stack easier. This exception shouldn't be caught and not
00597          // re-thrown anywhere except in Thread::threadRunner()
00598          throw ThreadCancelledException();
00599       }
00600    }
00601 }
00603 void saveThreadInTLS(void* pThreadArg)
00604 {
00605    Thread* pThread = static_cast<Thread*>(pThreadArg);
00606    DWORD threadId = pThread->getId();
00607     setThreadPointer(threadId, pThread);
00608 }
00610 void sendSignalToThread(Thread_t threadID, int signo)
00611 {
00612 }
00614 void cancel(Thread_t threadId)
00615 {
00616    HANDLE thdl = getThreadHandle(threadId);
00617    if (thdl != 0)
00618    {
00619       ::TerminateThread(thdl, -1);
00620    }
00621 }
00622 
00623 #endif // #ifdef BLOCXX_WIN32
00624 } // end namespace BLOCXX_ThreadImpl
00625 
00626 } // end namespace BLOCXX_NAMESPACE
00627 

Generated on Fri Jun 16 15:39:09 2006 for blocxx by  doxygen 1.4.6