00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00037 #include "blocxx/BLOCXX_config.h"
00038 #include "blocxx/ThreadPool.hpp"
00039 #include "blocxx/Array.hpp"
00040 #include "blocxx/Thread.hpp"
00041 #include "blocxx/NonRecursiveMutex.hpp"
00042 #include "blocxx/NonRecursiveMutexLock.hpp"
00043 #include "blocxx/Condition.hpp"
00044 #include "blocxx/Format.hpp"
00045 #include "blocxx/Mutex.hpp"
00046 #include "blocxx/MutexLock.hpp"
00047 #include "blocxx/NullLogger.hpp"
00048
00049 #include <deque>
00050
00051 #ifdef BLOCXX_DEBUG
00052 #include <iostream>
00053 #endif
00054
00055 namespace BLOCXX_NAMESPACE
00056 {
00057
00058 BLOCXX_DEFINE_EXCEPTION(ThreadPool);
00059
00060
00061 #define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { if ((logger)) BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
00062 #define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { if ((logger)) BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
00063
00065 class ThreadPoolImpl : public IntrusiveCountableBase
00066 {
00067 public:
00068
00069 virtual bool addWork(const RunnableRef& work, bool blockWhenFull) = 0;
00070 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs) = 0;
00071 virtual void waitForEmptyQueue() = 0;
00072 virtual ~ThreadPoolImpl()
00073 {
00074 }
00075 };
00076 namespace {
00077 class FixedSizePoolImpl;
00079 class FixedSizePoolWorkerThread : public Thread
00080 {
00081 public:
00082 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
00083 : Thread()
00084 , m_thePool(thePool)
00085 {
00086 }
00087 virtual Int32 run();
00088 private:
00089 virtual void doCooperativeCancel()
00090 {
00091 MutexLock lock(m_guard);
00092 if (m_currentRunnable)
00093 {
00094 m_currentRunnable->doCooperativeCancel();
00095 }
00096 }
00097 virtual void doDefinitiveCancel()
00098 {
00099 MutexLock lock(m_guard);
00100 if (m_currentRunnable)
00101 {
00102 m_currentRunnable->doCooperativeCancel();
00103 }
00104 }
00105
00106 FixedSizePoolImpl* m_thePool;
00107
00108 Mutex m_guard;
00109 RunnableRef m_currentRunnable;
00110
00111
00112 FixedSizePoolWorkerThread(const FixedSizePoolWorkerThread&);
00113 FixedSizePoolWorkerThread& operator=(const FixedSizePoolWorkerThread&);
00114 };
00116 class CommonPoolImpl : public ThreadPoolImpl
00117 {
00118 protected:
00119 CommonPoolImpl(UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00120 : m_maxQueueSize(maxQueueSize)
00121 , m_queueClosed(false)
00122 , m_shutdown(false)
00123 , m_logger(logger)
00124 , m_poolName(poolName)
00125 {
00126 }
00127
00128 virtual ~CommonPoolImpl()
00129 {
00130 }
00131
00132
00133 virtual bool queueIsFull() const
00134 {
00135 return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize));
00136 }
00137
00138
00139 bool queueClosed() const
00140 {
00141 return m_shutdown || m_queueClosed;
00142 }
00143
00144 bool finishOffWorkInQueue(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00145 {
00146 NonRecursiveMutexLock l(m_queueLock);
00147
00148 if (queueClosed())
00149 {
00150 BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is already closed. Why are you trying to shutdown again?");
00151 return false;
00152 }
00153 m_queueClosed = true;
00154 BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue closed");
00155
00156 if (finishWorkInQueue)
00157 {
00158 while (m_queue.size() != 0)
00159 {
00160 if (shutdownSecs < 0)
00161 {
00162 BLOCXX_POOL_LOG_DEBUG(m_logger, "Waiting forever for queue to empty");
00163 m_queueEmpty.wait(l);
00164 }
00165 else
00166 {
00167 BLOCXX_POOL_LOG_DEBUG(m_logger, "Waiting w/timout for queue to empty");
00168 if (!m_queueEmpty.timedWait(l, shutdownSecs))
00169 {
00170 BLOCXX_POOL_LOG_DEBUG(m_logger, "Wait timed out. Work in queue will be discarded.");
00171 break;
00172 }
00173 }
00174 }
00175 }
00176 m_shutdown = true;
00177 return true;
00178 }
00179
00180 virtual void waitForEmptyQueue()
00181 {
00182 NonRecursiveMutexLock l(m_queueLock);
00183 while (m_queue.size() != 0)
00184 {
00185 BLOCXX_POOL_LOG_DEBUG(m_logger, "Waiting for empty queue");
00186 m_queueEmpty.wait(l);
00187 }
00188 BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue empty: the wait is over");
00189 }
00190
00191 void shutdownThreads(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00192 {
00193 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownSecs))
00194 {
00195 return;
00196 }
00197
00198
00199 m_queueNotEmpty.notifyAll();
00200 m_queueNotFull.notifyAll();
00201
00202 if (shutdownSecs >= 0)
00203 {
00204
00205 for (UInt32 i = 0; i < m_threads.size(); ++i)
00206 {
00207 BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Calling cooperativeCancel on thread %1", i));
00208 m_threads[i]->cooperativeCancel();
00209 }
00210
00211 for (UInt32 i = 0; i < m_threads.size(); ++i)
00212 {
00213 BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Calling definitiveCancel on thread %1", i));
00214 if (!m_threads[i]->definitiveCancel(shutdownSecs))
00215 {
00216 BLOCXX_POOL_LOG_FATAL_ERROR(m_logger, Format("Thread %1 was forcibly cancelled.", i));
00217 }
00218 }
00219 }
00220
00221 for (UInt32 i = 0; i < m_threads.size(); ++i)
00222 {
00223 BLOCXX_POOL_LOG_DEBUG(m_logger, Format("calling join() on thread %1", i));
00224 m_threads[i]->join();
00225 BLOCXX_POOL_LOG_DEBUG(m_logger, Format("join() finished for thread %1", i));
00226 }
00227 }
00228
00229 RunnableRef getWorkFromQueue(bool waitForWork)
00230 {
00231 NonRecursiveMutexLock l(m_queueLock);
00232 while ((m_queue.size() == 0) && (!m_shutdown))
00233 {
00234 if (waitForWork)
00235 {
00236 BLOCXX_POOL_LOG_DEBUG(m_logger, "Waiting for work");
00237 m_queueNotEmpty.wait(l);
00238 }
00239 else
00240 {
00241
00242
00243 if (!m_queueNotEmpty.timedWait(l,1))
00244 {
00245 BLOCXX_POOL_LOG_DEBUG(m_logger, "No work after 1 sec. I'm not waiting any longer");
00246 return RunnableRef();
00247 }
00248 }
00249 }
00250
00251 if (m_shutdown)
00252 {
00253 BLOCXX_POOL_LOG_DEBUG(m_logger, "The pool is shutdown, not getting any more work");
00254 return RunnableRef();
00255 }
00256
00257 RunnableRef work = m_queue.front();
00258 m_queue.pop_front();
00259
00260
00261 if (!queueIsFull())
00262 {
00263 m_queueNotFull.notifyAll();
00264 }
00265
00266
00267 if (m_queue.size() == 0)
00268 {
00269 m_queueEmpty.notifyAll();
00270 }
00271 BLOCXX_POOL_LOG_DEBUG(m_logger, "A thread got some work to do");
00272 return work;
00273 }
00274
00275
00276 UInt32 m_maxQueueSize;
00277
00278 Array<ThreadRef> m_threads;
00279 std::deque<RunnableRef> m_queue;
00280 bool m_queueClosed;
00281 bool m_shutdown;
00282
00283 NonRecursiveMutex m_queueLock;
00284 Condition m_queueNotFull;
00285 Condition m_queueEmpty;
00286 Condition m_queueNotEmpty;
00287 LoggerRef m_logger;
00288 String m_poolName;
00289 };
00290 class FixedSizePoolImpl : public CommonPoolImpl
00291 {
00292 public:
00293 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00294 : CommonPoolImpl(maxQueueSize, logger, poolName)
00295 {
00296
00297 m_threads.reserve(numThreads);
00298 for (UInt32 i = 0; i < numThreads; ++i)
00299 {
00300 m_threads.push_back(ThreadRef(new FixedSizePoolWorkerThread(this)));
00301 }
00302 for (UInt32 i = 0; i < numThreads; ++i)
00303 {
00304 m_threads[i]->start();
00305 }
00306 BLOCXX_POOL_LOG_DEBUG(m_logger, "Threads are started and ready to go");
00307 }
00308
00309 virtual bool addWork(const RunnableRef& work, bool blockWhenFull)
00310 {
00311
00312 if (!work)
00313 {
00314 BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00315 return false;
00316 }
00317 NonRecursiveMutexLock l(m_queueLock);
00318 if (!blockWhenFull && queueIsFull())
00319 {
00320 BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is full. Not adding work and returning false");
00321 return false;
00322 }
00323 while ( queueIsFull() && !queueClosed() )
00324 {
00325 BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00326 m_queueNotFull.wait(l);
00327 }
00328
00329 if (queueClosed())
00330 {
00331 BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00332 return false;
00333 }
00334 m_queue.push_back(work);
00335
00336
00337 if (m_queue.size() == 1)
00338 {
00339 BLOCXX_POOL_LOG_DEBUG(m_logger, "Waking up sleepy workers");
00340 m_queueNotEmpty.notifyAll();
00341 }
00342
00343 BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00344 return true;
00345 }
00346
00347
00348 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00349 {
00350 shutdownThreads(finishWorkInQueue, shutdownSecs);
00351 }
00352 virtual ~FixedSizePoolImpl()
00353 {
00354
00355 try
00356 {
00357
00358 if (!queueClosed())
00359 {
00360
00361
00362 this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 1);
00363 }
00364 }
00365 catch (...)
00366 {
00367 }
00368 }
00369 private:
00370 friend class FixedSizePoolWorkerThread;
00371 };
00372 void runRunnable(const RunnableRef& work)
00373 {
00374
00375 try
00376 {
00377 work->run();
00378 }
00379 catch (ThreadCancelledException&)
00380 {
00381 throw;
00382 }
00383 catch (Exception& ex)
00384 {
00385 #ifdef BLOCXX_DEBUG
00386 std::cerr << "!!! Exception: " << ex.type() << " caught in ThreadPool worker: " << ex << std::endl;
00387 #endif
00388 }
00389 catch(std::exception& ex)
00390 {
00391 #ifdef BLOCXX_DEBUG
00392 std::cerr << "!!! std::exception what = " << ex.what() << std::endl;
00393 #endif
00394 }
00395 catch (...)
00396 {
00397 #ifdef BLOCXX_DEBUG
00398 std::cerr << "!!! Unknown Exception caught in ThreadPool worker" << std::endl;
00399 #endif
00400 }
00401 }
00402 Int32 FixedSizePoolWorkerThread::run()
00403 {
00404 while (true)
00405 {
00406
00407 RunnableRef work = m_thePool->getWorkFromQueue(true);
00408 if (!work)
00409 {
00410 return 0;
00411 }
00412
00413 {
00414 MutexLock lock(m_guard);
00415 m_currentRunnable = work;
00416 }
00417 runRunnable(work);
00418 {
00419 MutexLock lock(m_guard);
00420 m_currentRunnable = 0;
00421 }
00422 }
00423 return 0;
00424 }
00425 class DynamicSizePoolImpl;
00427 class DynamicSizePoolWorkerThread : public Thread
00428 {
00429 public:
00430 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
00431 : Thread()
00432 , m_thePool(thePool)
00433 {
00434 }
00435 virtual Int32 run();
00436 private:
00437 virtual void doCooperativeCancel()
00438 {
00439 MutexLock lock(m_guard);
00440 if (m_currentRunnable)
00441 {
00442 m_currentRunnable->doCooperativeCancel();
00443 }
00444 }
00445 virtual void doDefinitiveCancel()
00446 {
00447 MutexLock lock(m_guard);
00448 if (m_currentRunnable)
00449 {
00450 m_currentRunnable->doCooperativeCancel();
00451 }
00452 }
00453
00454 DynamicSizePoolImpl* m_thePool;
00455
00456 Mutex m_guard;
00457 RunnableRef m_currentRunnable;
00458
00459
00460 DynamicSizePoolWorkerThread(const DynamicSizePoolWorkerThread&);
00461 DynamicSizePoolWorkerThread& operator=(const DynamicSizePoolWorkerThread&);
00462 };
00464 class DynamicSizePoolImpl : public CommonPoolImpl
00465 {
00466 public:
00467 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00468 : CommonPoolImpl(maxQueueSize, logger, poolName)
00469 , m_maxThreads(maxThreads)
00470 {
00471 }
00472
00473 virtual bool addWork(const RunnableRef& work, bool blockWhenFull)
00474 {
00475
00476 if (!work)
00477 {
00478 BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00479 return false;
00480 }
00481 NonRecursiveMutexLock l(m_queueLock);
00482
00483
00484 if (queueClosed())
00485 {
00486 BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00487 return false;
00488 }
00489
00490
00491
00492
00493 size_t i = 0;
00494 while (i < m_threads.size())
00495 {
00496 if (!m_threads[i]->isRunning())
00497 {
00498 BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Thread %1 is finished. Cleaning up it's remains.", i));
00499 m_threads[i]->join();
00500 m_threads.remove(i);
00501 }
00502 else
00503 {
00504 ++i;
00505 }
00506 }
00507
00508 if (!blockWhenFull && queueIsFull())
00509 {
00510 BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is full. Not adding work and returning false");
00511 return false;
00512 }
00513 while ( queueIsFull() && !queueClosed() )
00514 {
00515 BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00516 m_queueNotFull.wait(l);
00517 }
00518
00519 m_queue.push_back(work);
00520
00521 BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00522
00523
00524
00525
00526
00527
00528
00529 l.release();
00530 m_queueNotEmpty.notifyOne();
00531 Thread::yield();
00532 l.lock();
00533
00534
00535 if (!m_queue.empty() && m_threads.size() < m_maxThreads)
00536 {
00537 ThreadRef theThread(new DynamicSizePoolWorkerThread(this));
00538 m_threads.push_back(theThread);
00539 BLOCXX_POOL_LOG_DEBUG(m_logger, "About to start a new thread");
00540 theThread->start();
00541 BLOCXX_POOL_LOG_DEBUG(m_logger, "New thread started");
00542 }
00543 return true;
00544 }
00545
00546
00547 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00548 {
00549 shutdownThreads(finishWorkInQueue, shutdownSecs);
00550 }
00551 virtual ~DynamicSizePoolImpl()
00552 {
00553
00554 try
00555 {
00556
00557 if (!queueClosed())
00558 {
00559
00560
00561 this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 1);
00562 }
00563 }
00564 catch (...)
00565 {
00566 }
00567 }
00568 protected:
00569 UInt32 getMaxThreads() const
00570 {
00571 return m_maxThreads;
00572 }
00573
00574 private:
00575
00576 UInt32 m_maxThreads;
00577 friend class DynamicSizePoolWorkerThread;
00578 };
00579 Int32 DynamicSizePoolWorkerThread::run()
00580 {
00581 while (true)
00582 {
00583
00584 RunnableRef work = m_thePool->getWorkFromQueue(false);
00585 if (!work)
00586 {
00587 return 0;
00588 }
00589
00590 {
00591 MutexLock lock(m_guard);
00592 m_currentRunnable = work;
00593 }
00594 runRunnable(work);
00595 {
00596 MutexLock lock(m_guard);
00597 m_currentRunnable = 0;
00598 }
00599 }
00600 return 0;
00601 }
00602
00604 class DynamicSizeNoQueuePoolImpl : public DynamicSizePoolImpl
00605 {
00606 public:
00607 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads, const LoggerRef& logger, const String& poolName)
00608 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName)
00609 {
00610 }
00611
00612 virtual ~DynamicSizeNoQueuePoolImpl()
00613 {
00614 }
00615
00616
00617 virtual bool queueIsFull() const
00618 {
00619
00620
00621 size_t freeThreads = getMaxThreads() - m_threads.size();
00622 return (freeThreads <= m_queue.size());
00623 }
00624
00625 };
00626
00627 }
00629 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const LoggerRef& logger_, const String& poolName)
00630 {
00631 LoggerRef logger(logger_);
00632 if (!logger)
00633 {
00634 logger = LoggerRef(new NullLogger());
00635 }
00636 switch (poolType)
00637 {
00638 case FIXED_SIZE:
00639 m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00640 break;
00641 case DYNAMIC_SIZE:
00642 m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00643 break;
00644 case DYNAMIC_SIZE_NO_QUEUE:
00645 m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
00646 break;
00647 }
00648 }
00650 bool ThreadPool::addWork(const RunnableRef& work)
00651 {
00652 return m_impl->addWork(work, true);
00653 }
00655 bool ThreadPool::tryAddWork(const RunnableRef& work)
00656 {
00657 return m_impl->addWork(work, false);
00658 }
00660 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00661 {
00662 m_impl->shutdown(finishWorkInQueue, shutdownSecs);
00663 }
00665 void ThreadPool::waitForEmptyQueue()
00666 {
00667 m_impl->waitForEmptyQueue();
00668 }
00670 ThreadPool::~ThreadPool()
00671 {
00672 }
00674 ThreadPool::ThreadPool(const ThreadPool& x)
00675 : IntrusiveCountableBase(x)
00676 , m_impl(x.m_impl)
00677 {
00678 }
00680 ThreadPool& ThreadPool::operator=(const ThreadPool& x)
00681 {
00682 m_impl = x.m_impl;
00683 return *this;
00684 }
00685
00686 }
00687