00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00038 #include "blocxx/BLOCXX_config.h"
00039 #include "blocxx/Thread.hpp"
00040 #include "blocxx/Assertion.hpp"
00041 #include "blocxx/Format.hpp"
00042 #include "blocxx/ThreadBarrier.hpp"
00043 #include "blocxx/NonRecursiveMutexLock.hpp"
00044 #include "blocxx/ExceptionIds.hpp"
00045
00046 #include <cstring>
00047 #include <cstdio>
00048 #include <cerrno>
00049 #include <iostream>
00050 #include <csignal>
00051 #include <cassert>
00052
00053 #ifdef BLOCXX_HAVE_OPENSSL
00054 #include <openssl/err.h>
00055 #endif
00056
00057 namespace BLOCXX_NAMESPACE
00058 {
00059
00061 BLOCXX_DEFINE_EXCEPTION_WITH_ID(Thread);
00062 BLOCXX_DEFINE_EXCEPTION_WITH_ID(CancellationDenied);
00064
00065 struct ThreadParam
00066 {
00067 ThreadParam(Thread* t, const ThreadDoneCallbackRef& c, const ThreadBarrier& b)
00068 : thread(t)
00069 , cb(c)
00070 , thread_barrier(b)
00071 {}
00072 Thread* thread;
00073 ThreadDoneCallbackRef cb;
00074 ThreadBarrier thread_barrier;
00075 };
00076 static Thread_t zeroThread();
00077 static Thread_t NULLTHREAD = zeroThread();
00079 static inline bool
00080 sameId(const Thread_t& t1, const Thread_t& t2)
00081 {
00082 return ThreadImpl::sameThreads(t1, t2);
00083 }
00085
00086 Thread::Thread()
00087 : m_id(NULLTHREAD)
00088 , m_isRunning(false)
00089 , m_isStarting(false)
00090 , m_joined(false)
00091 , m_cancelRequested(false)
00092 , m_cancelled(false)
00093 {
00094 }
00096
00097 Thread::~Thread()
00098 {
00099 try
00100 {
00101 if (!m_joined)
00102 {
00103 join();
00104 }
00105 assert(m_isRunning == false);
00106 if (!sameId(m_id, NULLTHREAD))
00107 {
00108 ThreadImpl::destroyThread(m_id);
00109 }
00110 }
00111 catch (...)
00112 {
00113
00114 }
00115 }
00117
00118 void
00119 Thread::start(const ThreadDoneCallbackRef& cb)
00120 {
00121 if (isRunning())
00122 {
00123 BLOCXX_THROW(ThreadException,
00124 "Thread::start - thread is already running");
00125 }
00126 if (!sameId(m_id, NULLTHREAD))
00127 {
00128 BLOCXX_THROW(ThreadException,
00129 "Thread::start - cannot start previously run thread");
00130 }
00131 m_isStarting = true;
00132 UInt32 flgs = BLOCXX_THREAD_FLG_JOINABLE;
00133 ThreadBarrier thread_barrier(2);
00134
00135 ThreadParam* p = new ThreadParam(this, cb, thread_barrier);
00136 if (ThreadImpl::createThread(m_id, threadRunner, p, flgs) != 0)
00137 {
00138 BLOCXX_THROW(ThreadException, "ThreadImpl::createThread failed");
00139 }
00140 m_isStarting = false;
00141 thread_barrier.wait();
00142 }
00144
00145 Int32
00146 Thread::join()
00147 {
00148 BLOCXX_ASSERT(!sameId(m_id, NULLTHREAD));
00149 Int32 rval;
00150 if (ThreadImpl::joinThread(m_id, rval) != 0)
00151 {
00152 BLOCXX_THROW(ThreadException,
00153 Format("Thread::join - ThreadImpl::joinThread: %1(%2)",
00154 errno, strerror(errno)).c_str());
00155 }
00156
00157 m_isRunning = false;
00158 m_joined = true;
00159 return rval;
00160 }
00162
00163
00164 Int32
00165 Thread::threadRunner(void* paramPtr)
00166 {
00167 Thread_t theThreadID;
00168 Int32 rval = -1;
00169 try
00170 {
00171
00172 BLOCXX_ASSERT(paramPtr != NULL);
00173 ThreadParam* pParam = static_cast<ThreadParam*>(paramPtr);
00174 Thread* pTheThread = pParam->thread;
00175 ThreadImpl::saveThreadInTLS(pTheThread);
00176 theThreadID = pTheThread->m_id;
00177 ThreadDoneCallbackRef cb = pParam->cb;
00178 ThreadBarrier thread_barrier = pParam->thread_barrier;
00179 delete pParam;
00180 pTheThread->m_isRunning = true;
00181 thread_barrier.wait();
00182
00183 try
00184 {
00185 rval = pTheThread->run();
00186 }
00187
00188 catch (ThreadCancelledException&)
00189 {
00190 }
00191 catch (Exception& ex)
00192 {
00193 #ifdef BLOCXX_DEBUG
00194 std::cerr << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00195 std::cerr << ex << std::endl;
00196 #endif
00197 pTheThread->doneRunning(cb);
00198
00199
00200 throw;
00201 }
00202 catch (...)
00203 {
00204 #ifdef BLOCXX_DEBUG
00205 std::cerr << "!!! Unknown Exception caught in Thread class" << std::endl;
00206 #endif
00207 pTheThread->doneRunning(cb);
00208
00209
00210 throw;
00211 }
00212
00213 pTheThread->doneRunning(cb);
00214
00215 }
00216 catch (Exception& ex)
00217 {
00218 #ifdef BLOCXX_DEBUG
00219 std::cerr << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00220 std::cerr << ex << std::endl;
00221 #endif
00222
00223 ThreadImpl::exitThread(theThreadID, rval);
00224 }
00225 catch (...)
00226 {
00227 #ifdef BLOCXX_DEBUG
00228 std::cerr << "!!! Unknown Exception caught in Thread class" << std::endl;
00229 #endif
00230
00231 ThreadImpl::exitThread(theThreadID, rval);
00232 }
00233
00234 ThreadImpl::exitThread(theThreadID, rval);
00235 return rval;
00236 }
00237
00239 void
00240 Thread::doneRunning(const ThreadDoneCallbackRef& cb)
00241 {
00242 NonRecursiveMutexLock l(m_cancelLock);
00243 m_isRunning = m_isStarting = false;
00244 m_cancelled = true;
00245 m_cancelCond.notifyAll();
00246 if (cb)
00247 {
00248 cb->notifyThreadDone(this);
00249 }
00250 #ifdef OW_HAVE_OPENSSL
00251
00252 ERR_remove_state(0);
00253 #endif
00254 }
00255
00257 static Thread_t
00258 zeroThread()
00259 {
00260 Thread_t zthr;
00261 ::memset(&zthr, 0, sizeof(zthr));
00262 return zthr;
00263 }
00265 void
00266 Thread::cooperativeCancel()
00267 {
00268 if (!isRunning())
00269 {
00270 return;
00271 }
00272
00273
00274 doCooperativeCancel();
00275 NonRecursiveMutexLock l(m_cancelLock);
00276 m_cancelRequested = true;
00277
00278 #if !defined(BLOCXX_WIN32)
00279
00280
00281
00282
00283 try
00284 {
00285 ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00286 }
00287 catch (ThreadException&)
00288 {
00289 }
00290 #endif
00291 }
00293 bool
00294 Thread::definitiveCancel(UInt32 waitForCooperativeSecs)
00295 {
00296 if (!isRunning())
00297 {
00298 return true;
00299 }
00300
00301
00302 doCooperativeCancel();
00303 NonRecursiveMutexLock l(m_cancelLock);
00304 m_cancelRequested = true;
00305
00306 #if !defined(BLOCXX_WIN32)
00307
00308
00309
00310
00311 try
00312 {
00313 ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00314 }
00315 catch (ThreadException&)
00316 {
00317 }
00318 #endif
00319
00320 while (!m_cancelled && isRunning())
00321 {
00322 if (!m_cancelCond.timedWait(l, waitForCooperativeSecs, 0))
00323 {
00324
00325 doDefinitiveCancel();
00326
00327 if (!m_cancelled && isRunning())
00328 {
00329 this->cancel();
00330 }
00331 return false;
00332 }
00333 }
00334 return true;
00335 }
00337 void
00338 Thread::cancel()
00339 {
00340
00341
00342 try
00343 {
00344 ThreadImpl::cancel(m_id);
00345 }
00346 catch (ThreadException&)
00347 {
00348 }
00349 m_cancelled = true;
00350 }
00352 void
00353 Thread::testCancel()
00354 {
00355 ThreadImpl::testCancel();
00356 }
00358 void
00359 Thread::doCooperativeCancel()
00360 {
00361 }
00363 void
00364 Thread::doDefinitiveCancel()
00365 {
00366 }
00367
00368 }
00369