00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00037 #include "blocxx/BLOCXX_config.h"
00038 #include "blocxx/ThreadImpl.hpp"
00039 #include "blocxx/Mutex.hpp"
00040 #include "blocxx/Assertion.hpp"
00041 #include "blocxx/Thread.hpp"
00042 #include "blocxx/NonRecursiveMutexLock.hpp"
00043 #include "blocxx/Format.hpp"
00044 #if defined(BLOCXX_WIN32)
00045 #include "blocxx/Map.hpp"
00046 #include "blocxx/MutexLock.hpp"
00047 #endif
00048 #include <cassert>
00049 #include <cstring>
00050 #include <cstddef>
00051
00052 extern "C"
00053 {
00054 #ifdef BLOCXX_HAVE_SYS_TIME_H
00055 #include <sys/time.h>
00056 #endif
00057
00058 #include <sys/types.h>
00059
00060 #ifdef BLOCXX_HAVE_UNISTD_H
00061 #include <unistd.h>
00062 #endif
00063
00064 #include <errno.h>
00065 #include <signal.h>
00066
00067 #ifdef BLOCXX_USE_PTHREAD
00068 #include <pthread.h>
00069 #endif
00070
00071 #ifdef BLOCXX_WIN32
00072 #include <process.h>
00073 #endif
00074 }
00075
00076 namespace BLOCXX_NAMESPACE
00077 {
00078
00079 namespace ThreadImpl {
00080
00082
00083 void
00084 sleep(UInt32 milliSeconds)
00085 {
00086 ThreadImpl::testCancel();
00087 #if defined(BLOCXX_HAVE_NANOSLEEP)
00088 struct timespec wait;
00089 wait.tv_sec = milliSeconds / 1000;
00090 wait.tv_nsec = (milliSeconds % 1000) * 1000000;
00091 while (nanosleep(&wait, &wait) == -1 && errno == EINTR)
00092 {
00093 ThreadImpl::testCancel();
00094 }
00095 #elif BLOCXX_WIN32
00096 Sleep(milliSeconds);
00097 #else
00098 timeval now, end;
00099 unsigned long microSeconds = milliSeconds * 1000;
00100 const UInt32 loopMicroSeconds = 100 * 1000;
00101 gettimeofday(&now, NULL);
00102 end = now;
00103 end.tv_sec += microSeconds / 1000000;
00104 end.tv_usec += microSeconds % 1000000;
00105 while ((now.tv_sec < end.tv_sec)
00106 || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec)))
00107 {
00108 timeval tv;
00109 tv.tv_sec = end.tv_sec - now.tv_sec;
00110 if (end.tv_usec >= now.tv_usec)
00111 {
00112 tv.tv_usec = end.tv_usec - now.tv_usec;
00113 }
00114 else
00115 {
00116 tv.tv_sec--;
00117 tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00118 }
00119 if (tv.tv_sec > 0 || tv.tv_usec > loopMicroSeconds)
00120 {
00121 tv.tv_sec = 0;
00122 tv.tv_usec = loopMicroSeconds;
00123 }
00124 ThreadImpl::testCancel();
00125 select(0, NULL, NULL, NULL, &tv);
00126 gettimeofday(&now, NULL);
00127 }
00128 #endif
00129 }
00131
00132 void
00133 yield()
00134 {
00135 #if defined(BLOCXX_HAVE_SCHED_YIELD)
00136 sched_yield();
00137 #elif defined(BLOCXX_WIN32)
00138 ThreadImpl::testCancel();
00139 ::SwitchToThread();
00140 #else
00141 ThreadImpl::sleep(1);
00142 #endif
00143 }
00144
00145 #if defined(BLOCXX_USE_PTHREAD)
00146 namespace {
00147 struct LocalThreadParm
00148 {
00149 ThreadFunction m_func;
00150 void* m_funcParm;
00151 };
00152 extern "C" {
00153 static void*
00154 threadStarter(void* arg)
00155 {
00156
00157
00158 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00159 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00160
00161
00162 sigset_t signalSet;
00163 int rv = sigfillset(&signalSet);
00164 BLOCXX_ASSERT(rv == 0);
00165 rv = sigdelset(&signalSet, SIGUSR1);
00166 BLOCXX_ASSERT(rv == 0);
00167 rv = pthread_sigmask(SIG_SETMASK, &signalSet, 0);
00168 BLOCXX_ASSERT(rv == 0);
00169
00170 LocalThreadParm* parg = static_cast<LocalThreadParm*>(arg);
00171 ThreadFunction func = parg->m_func;
00172 void* funcParm = parg->m_funcParm;
00173 delete parg;
00174 Int32 rval = (*func)(funcParm);
00175 void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00176 pthread_exit(prval);
00177 return prval;
00178 }
00179 }
00180
00181
00182 struct default_stack_size
00183 {
00184 default_stack_size()
00185 {
00186
00187 val = 0;
00188 needsSetting = false;
00189
00190
00191
00192
00193 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
00194 pthread_attr_t stack_size_attr;
00195 if (pthread_attr_init(&stack_size_attr) != 0)
00196 {
00197 return;
00198 }
00199 if (pthread_attr_getstacksize(&stack_size_attr, &val) != 0)
00200 {
00201 return;
00202 }
00203
00204 if (val < 1048576)
00205 {
00206 val = 1048576;
00207 needsSetting = true;
00208 }
00209 #ifdef PTHREAD_STACK_MIN
00210 if (PTHREAD_STACK_MIN > val)
00211 {
00212 val = PTHREAD_STACK_MIN;
00213 needsSetting = true;
00214 }
00215 #endif
00216 #endif
00217 }
00218 static size_t val;
00219 static bool needsSetting;
00220 };
00221 size_t default_stack_size::val = 0;
00222 bool default_stack_size::needsSetting(false);
00223 default_stack_size g_theDefaultStackSize;
00225 pthread_once_t once_control = PTHREAD_ONCE_INIT;
00226 pthread_key_t theKey;
00227 extern "C" {
00229 static void initializeTheKey()
00230 {
00231 pthread_key_create(&theKey,NULL);
00232
00233 struct sigaction temp;
00234 memset(&temp, '\0', sizeof(temp));
00235 sigaction(SIGUSR1, 0, &temp);
00236 if (temp.sa_handler != SIG_IGN)
00237 {
00238 temp.sa_handler = SIG_IGN;
00239 sigemptyset(&temp.sa_mask);
00240 temp.sa_flags = 0;
00241 sigaction(SIGUSR1, &temp, NULL);
00242 }
00243 }
00244 }
00245 }
00247
00248 int
00249 createThread(Thread_t& handle, ThreadFunction func,
00250 void* funcParm, UInt32 threadFlags)
00251 {
00252 int cc = 0;
00253 pthread_attr_t attr;
00254 pthread_attr_init(&attr);
00255 if (!(threadFlags & BLOCXX_THREAD_FLG_JOINABLE))
00256 {
00257 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00258 }
00259
00260 #if !defined(BLOCXX_VALGRIND_SUPPORT) // valgrind doesn't like us to set the stack size
00261
00262 if (default_stack_size::needsSetting)
00263 {
00264 pthread_attr_setstacksize(&attr, default_stack_size::val);
00265 }
00266 #endif
00267
00268 LocalThreadParm* parg = new LocalThreadParm;
00269 parg->m_func = func;
00270 parg->m_funcParm = funcParm;
00271 if (pthread_create(&handle, &attr, threadStarter, parg) != 0)
00272 {
00273 cc = -1;
00274 }
00275 pthread_attr_destroy(&attr);
00276 return cc;
00277 }
00279
00280 void
00281 exitThread(Thread_t&, Int32 rval)
00282 {
00283 void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00284 pthread_exit(prval);
00285 }
00286
00287
00288 #if defined(BLOCXX_SIZEOF_PTHREAD_T)
00289 #if BLOCXX_SIZEOF_PTHREAD_T == 2
00290 #define BLOCXX_THREAD_CONVERTER UInt16
00291 #elif BLOCXX_SIZEOF_PTHREAD_T == 4
00292 #define BLOCXX_THREAD_CONVERTER UInt32
00293 #elif BLOCXX_SIZEOF_PTHREAD_T == 8
00294 #define BLOCXX_THREAD_CONVERTER UInt64
00295 #else
00296 #error Unexpected size for pthread_t
00297 #endif
00298 #else
00299 #error No pthread_t size was found!
00300 #endif
00301
00302 UInt64 thread_t_ToUInt64(Thread_t thr)
00303 {
00304 return UInt64(BLOCXX_THREAD_CONVERTER(thr));
00305 }
00306 #undef BLOCXX_THREAD_CONVERTER
00307
00309
00310 void
00311 destroyThread(Thread_t& )
00312 {
00313 }
00315
00316 int
00317 setThreadDetached(Thread_t& handle)
00318 {
00319 int cc = pthread_detach(handle);
00320 if (cc != 0)
00321 {
00322 if (cc != EINVAL)
00323 {
00324 cc = -1;
00325 }
00326 }
00327 return cc;
00328 }
00330
00331 int
00332 joinThread(Thread_t& handle, Int32& rval)
00333 {
00334 void* prval(0);
00335 if ((errno = pthread_join(handle, &prval)) == 0)
00336 {
00337 rval = static_cast<Int32>(reinterpret_cast<ptrdiff_t>(prval));
00338 return 0;
00339 }
00340 else
00341 {
00342 return 1;
00343 }
00344 }
00346 void
00347 testCancel()
00348 {
00349
00350 pthread_once(&once_control, &initializeTheKey);
00351 Thread* theThread = reinterpret_cast<Thread*>(pthread_getspecific(theKey));
00352 if (theThread == 0)
00353 {
00354 return;
00355 }
00356 NonRecursiveMutexLock l(theThread->m_cancelLock);
00357 if (theThread->m_cancelRequested)
00358 {
00359
00360
00361
00362
00363
00364 throw ThreadCancelledException();
00365 }
00366 }
00368 void saveThreadInTLS(void* pTheThread)
00369 {
00370
00371 pthread_once(&once_control, &initializeTheKey);
00372 int rc;
00373 if ((rc = pthread_setspecific(theKey, pTheThread)) != 0)
00374 {
00375 BLOCXX_THROW(ThreadException, Format("pthread_setspecific failed. error = %1(%2)", rc, strerror(rc)).c_str());
00376 }
00377 }
00379 void sendSignalToThread(Thread_t threadID, int signo)
00380 {
00381 int rc;
00382 if ((rc = pthread_kill(threadID, signo)) != 0)
00383 {
00384 BLOCXX_THROW(ThreadException, Format("pthread_kill failed. error = %1(%2)", rc, strerror(rc)).c_str());
00385 }
00386 }
00388 void cancel(Thread_t threadID)
00389 {
00390 int rc;
00391 if ((rc = pthread_cancel(threadID)) != 0)
00392 {
00393 BLOCXX_THROW(ThreadException, Format("pthread_cancel failed. error = %1(%2)", rc, strerror(rc)).c_str());
00394 }
00395 }
00396 #endif // #ifdef BLOCXX_USE_PTHREAD
00397
00398 #if defined(BLOCXX_WIN32)
00399
00400 namespace {
00401
00402 struct WThreadInfo
00403 {
00404 HANDLE handle;
00405 BLOCXX_NAMESPACE::Thread* pTheThread;
00406 };
00407
00408 typedef Map<DWORD, WThreadInfo> Win32ThreadMap;
00409 Win32ThreadMap g_threads;
00410 Mutex g_threadsGuard;
00411
00412 struct LocalThreadParm
00413 {
00414 ThreadFunction m_func;
00415 void* m_funcParm;
00416 };
00417
00419 extern "C" {
00420 unsigned __stdcall threadStarter(void* arg)
00421 {
00422 LocalThreadParm* parg = reinterpret_cast<LocalThreadParm*>(arg);
00423 ThreadFunction func = parg->m_func;
00424 void* funcParm = parg->m_funcParm;
00425 delete parg;
00426 Int32 rval = (*func)(funcParm);
00427 ::_endthreadex(static_cast<unsigned>(rval));
00428 return rval;
00429 }
00430 }
00431
00433 void
00434 addThreadToMap(DWORD threadId, HANDLE threadHandle)
00435 {
00436 MutexLock ml(g_threadsGuard);
00437 WThreadInfo wi;
00438 wi.handle = threadHandle;
00439 wi.pTheThread = 0;
00440 g_threads[threadId] = wi;
00441 }
00442
00444 HANDLE
00445 getThreadHandle(DWORD threadId)
00446 {
00447 MutexLock ml(g_threadsGuard);
00448 HANDLE chdl = 0;
00449 Win32ThreadMap::iterator it = g_threads.find(threadId);
00450 if (it != g_threads.end())
00451 {
00452 chdl = it->second.handle;
00453 }
00454 return chdl;
00455 }
00456
00458 void
00459 setThreadPointer(DWORD threadId, Thread* pTheThread)
00460 {
00461 MutexLock ml(g_threadsGuard);
00462 Win32ThreadMap::iterator it = g_threads.find(threadId);
00463 if (it != g_threads.end())
00464 {
00465 it->second.pTheThread = pTheThread;
00466 }
00467 }
00468
00470 HANDLE
00471 removeThreadFromMap(DWORD threadId)
00472 {
00473 MutexLock ml(g_threadsGuard);
00474 HANDLE chdl = 0;
00475 Win32ThreadMap::iterator it = g_threads.find(threadId);
00476 if (it != g_threads.end())
00477 {
00478 chdl = it->second.handle;
00479 g_threads.erase(it);
00480 }
00481 return chdl;
00482 }
00483
00485 Thread*
00486 getThreadObject(DWORD threadId)
00487 {
00488 Thread* pTheThread = 0;
00489 MutexLock ml(g_threadsGuard);
00490 Win32ThreadMap::iterator it = g_threads.find(threadId);
00491 if (it != g_threads.end())
00492 {
00493 pTheThread = it->second.pTheThread;
00494 }
00495 return pTheThread;
00496 }
00497
00498 }
00499
00501
00502 int
00503 createThread(Thread_t& handle, ThreadFunction func,
00504 void* funcParm, UInt32 threadFlags)
00505 {
00506 int cc = -1;
00507 HANDLE hThread;
00508 unsigned threadId;
00509
00510 LocalThreadParm* parg = new LocalThreadParm;
00511 parg->m_func = func;
00512 parg->m_funcParm = funcParm;
00513 hThread = reinterpret_cast<HANDLE>(::_beginthreadex(NULL, 0, threadStarter,
00514 parg, 0, &threadId));
00515 if (hThread != 0)
00516 {
00517 addThreadToMap(threadId, hThread);
00518 handle = threadId;
00519 cc = 0;
00520 }
00521
00522 return cc;
00523 }
00525
00526 void
00527 exitThread(Thread_t&, Int32 rval)
00528 {
00529 ::_endthreadex(static_cast<unsigned>(rval));
00530 }
00531
00533
00534 UInt64 thread_t_ToUInt64(Thread_t thr)
00535 {
00536
00537 BLOCXX_ASSERTMSG(sizeof(unsigned long) >= sizeof(Thread_t)," Thread_t truncated!");
00538 return static_cast<UInt64>(thr);
00539 }
00540
00542
00543 void
00544 destroyThread(Thread_t& threadId)
00545 {
00546 HANDLE thdl = removeThreadFromMap(threadId);
00547 if (thdl != 0)
00548 {
00549 ::CloseHandle(thdl);
00550 }
00551 }
00553
00554 int
00555 setThreadDetached(Thread_t& handle)
00556 {
00557
00558 return 0;
00559 }
00561
00562 int
00563 joinThread(Thread_t& threadId, Int32& rvalArg)
00564 {
00565 int cc = -1;
00566 DWORD rval;
00567 HANDLE thdl = getThreadHandle(threadId);
00568 if (thdl != 0)
00569 {
00570 if (::WaitForSingleObject(thdl, INFINITE) != WAIT_FAILED)
00571 {
00572 if (::GetExitCodeThread(thdl, &rval) != 0)
00573 {
00574 rvalArg = static_cast<Int32>(rval);
00575 cc = 0;
00576 }
00577 }
00578 }
00579 return cc;
00580 }
00581
00583 void
00584 testCancel()
00585 {
00586 DWORD threadId = ThreadImpl::currentThread();
00587 Thread* pTheThread = getThreadObject(threadId);
00588 if (pTheThread)
00589 {
00590 NonRecursiveMutexLock l(pTheThread->m_cancelLock);
00591 if (pTheThread->m_cancelRequested)
00592 {
00593
00594
00595
00596
00597
00598 throw ThreadCancelledException();
00599 }
00600 }
00601 }
00603 void saveThreadInTLS(void* pThreadArg)
00604 {
00605 Thread* pThread = static_cast<Thread*>(pThreadArg);
00606 DWORD threadId = pThread->getId();
00607 setThreadPointer(threadId, pThread);
00608 }
00610 void sendSignalToThread(Thread_t threadID, int signo)
00611 {
00612 }
00614 void cancel(Thread_t threadId)
00615 {
00616 HANDLE thdl = getThreadHandle(threadId);
00617 if (thdl != 0)
00618 {
00619 ::TerminateThread(thdl, -1);
00620 }
00621 }
00622
00623 #endif // #ifdef BLOCXX_WIN32
00624 }
00625
00626 }
00627