ThreadPool.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2004 Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2005 Novell, Inc. All rights reserved.
00004 *
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 *
00008 *  - Redistributions of source code must retain the above copyright notice,
00009 *    this list of conditions and the following disclaimer.
00010 *
00011 *  - Redistributions in binary form must reproduce the above copyright notice,
00012 *    this list of conditions and the following disclaimer in the documentation
00013 *    and/or other materials provided with the distribution.
00014 *
00015 *  - Neither the name of Vintela, Inc., Novell, Inc., nor the names of its
00016 *    contributors may be used to endorse or promote products derived from this
00017 *    software without specific prior written permission.
00018 *
00019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00020 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00021 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00022 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc., Novell, Inc., OR THE 
00023 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
00024 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
00025 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 
00026 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
00027 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
00028 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
00029 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 *******************************************************************************/
00031 
00032 
00037 #include "blocxx/BLOCXX_config.h"
00038 #include "blocxx/ThreadPool.hpp"
00039 #include "blocxx/Array.hpp"
00040 #include "blocxx/Thread.hpp"
00041 #include "blocxx/NonRecursiveMutex.hpp"
00042 #include "blocxx/NonRecursiveMutexLock.hpp"
00043 #include "blocxx/Condition.hpp"
00044 #include "blocxx/Format.hpp"
00045 #include "blocxx/Mutex.hpp"
00046 #include "blocxx/MutexLock.hpp"
00047 #include "blocxx/NullLogger.hpp"
00048 
00049 #include <deque>
00050 
00051 #ifdef BLOCXX_DEBUG     
00052 #include <iostream> // for cerr
00053 #endif
00054 
00055 namespace BLOCXX_NAMESPACE
00056 {
00057 
00058 BLOCXX_DEFINE_EXCEPTION(ThreadPool);
00059 
00060 // logger can be null
00061 #define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { if ((logger)) BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
00062 #define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { if ((logger)) BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
00063 
00065 class ThreadPoolImpl : public IntrusiveCountableBase
00066 {
00067 public:
00068    // returns true if work is placed in the queue to be run and false if not.
00069    virtual bool addWork(const RunnableRef& work, bool blockWhenFull) = 0;
00070    virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs) = 0;
00071    virtual void waitForEmptyQueue() = 0;
00072    virtual ~ThreadPoolImpl()
00073    {
00074    }
00075 };
00076 namespace {
00077 class FixedSizePoolImpl;
00079 class FixedSizePoolWorkerThread : public Thread
00080 {
00081 public:
00082    FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
00083       : Thread()
00084       , m_thePool(thePool)
00085    {
00086    }
00087    virtual Int32 run();
00088 private:
00089    virtual void doCooperativeCancel()
00090    {
00091       MutexLock lock(m_guard);
00092       if (m_currentRunnable)
00093       {
00094          m_currentRunnable->doCooperativeCancel();
00095       }
00096    }
00097    virtual void doDefinitiveCancel()
00098    {
00099       MutexLock lock(m_guard);
00100       if (m_currentRunnable)
00101       {
00102          m_currentRunnable->doCooperativeCancel();
00103       }
00104    }
00105 
00106    FixedSizePoolImpl* m_thePool;
00107 
00108    Mutex m_guard;
00109    RunnableRef m_currentRunnable;
00110 
00111    // non-copyable
00112    FixedSizePoolWorkerThread(const FixedSizePoolWorkerThread&);
00113    FixedSizePoolWorkerThread& operator=(const FixedSizePoolWorkerThread&);
00114 };
00116 class CommonPoolImpl : public ThreadPoolImpl
00117 {
00118 protected:
00119    CommonPoolImpl(UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00120       : m_maxQueueSize(maxQueueSize)
00121       , m_queueClosed(false)
00122       , m_shutdown(false)
00123       , m_logger(logger)
00124       , m_poolName(poolName)
00125    {
00126    }
00127 
00128    virtual ~CommonPoolImpl()
00129    {
00130    }
00131    
00132    // assumes that m_queueLock is locked. DynamicSizeNoQueuePoolImpl overrides this.
00133    virtual bool queueIsFull() const
00134    {
00135       return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize));
00136    }
00137    
00138    // assumes that m_queueLock is locked
00139    bool queueClosed() const
00140    {
00141       return m_shutdown || m_queueClosed;
00142    }
00143    
00144    bool finishOffWorkInQueue(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00145    {
00146       NonRecursiveMutexLock l(m_queueLock);
00147       // the pool is in the process of being destroyed
00148       if (queueClosed())
00149       {
00150          BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is already closed.  Why are you trying to shutdown again?");
00151          return false;
00152       }
00153       m_queueClosed = true;
00154       BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue closed");
00155       
00156       if (finishWorkInQueue)
00157       {
00158          while (m_queue.size() != 0)
00159          {
00160             if (shutdownSecs < 0)
00161             {
00162                BLOCXX_POOL_LOG_DEBUG(m_logger, "Waiting forever for queue to empty");
00163                m_queueEmpty.wait(l);
00164             }
00165             else
00166             {
00167                BLOCXX_POOL_LOG_DEBUG(m_logger, "Waiting w/timout for queue to empty");
00168                if (!m_queueEmpty.timedWait(l, shutdownSecs))
00169                {
00170                   BLOCXX_POOL_LOG_DEBUG(m_logger, "Wait timed out. Work in queue will be discarded.");
00171                   break; // timed out
00172                }
00173             }
00174          }
00175       }
00176       m_shutdown = true;
00177       return true;
00178    }
00179 
00180    virtual void waitForEmptyQueue()
00181    {
00182       NonRecursiveMutexLock l(m_queueLock);
00183       while (m_queue.size() != 0)
00184       {
00185          BLOCXX_POOL_LOG_DEBUG(m_logger, "Waiting for empty queue");
00186          m_queueEmpty.wait(l);
00187       }
00188       BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue empty: the wait is over");
00189    }
00190    
00191    void shutdownThreads(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00192    {
00193       if (!finishOffWorkInQueue(finishWorkInQueue, shutdownSecs))
00194       {
00195          return;
00196       }
00197 
00198       // Wake up any workers so they recheck shutdown flag
00199       m_queueNotEmpty.notifyAll();
00200       m_queueNotFull.notifyAll();
00201 
00202       if (shutdownSecs >= 0)
00203       {
00204          // Set cooperative thread cancellation flag
00205          for (UInt32 i = 0; i < m_threads.size(); ++i)
00206          {
00207             BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Calling cooperativeCancel on thread %1", i));
00208             m_threads[i]->cooperativeCancel();
00209          }
00210          // If any still haven't shut down, definitiveCancel will kill them.
00211          for (UInt32 i = 0; i < m_threads.size(); ++i)
00212          {
00213             BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Calling definitiveCancel on thread %1", i));
00214             if (!m_threads[i]->definitiveCancel(shutdownSecs))
00215             {
00216                BLOCXX_POOL_LOG_FATAL_ERROR(m_logger, Format("Thread %1 was forcibly cancelled.", i));
00217             }
00218          }
00219       }
00220       // Clean up after the threads and/or wait for them to exit.
00221       for (UInt32 i = 0; i < m_threads.size(); ++i)
00222       {
00223          BLOCXX_POOL_LOG_DEBUG(m_logger, Format("calling join() on thread %1", i));
00224          m_threads[i]->join();
00225          BLOCXX_POOL_LOG_DEBUG(m_logger, Format("join() finished for thread %1", i));
00226       }
00227    }
00228 
00229    RunnableRef getWorkFromQueue(bool waitForWork)
00230    {
00231       NonRecursiveMutexLock l(m_queueLock);
00232       while ((m_queue.size() == 0) && (!m_shutdown))
00233       {
00234          if (waitForWork)
00235          {
00236             BLOCXX_POOL_LOG_DEBUG(m_logger, "Waiting for work");
00237             m_queueNotEmpty.wait(l);
00238          }
00239          else
00240          {
00241             // wait 1 sec for work, to more efficiently handle a stream
00242             // of single requests.
00243             if (!m_queueNotEmpty.timedWait(l,1))
00244             {
00245                BLOCXX_POOL_LOG_DEBUG(m_logger, "No work after 1 sec. I'm not waiting any longer");
00246                return RunnableRef();
00247             }
00248          }
00249       }
00250       // check to see if a shutdown started while the thread was sleeping
00251       if (m_shutdown)
00252       {
00253          BLOCXX_POOL_LOG_DEBUG(m_logger, "The pool is shutdown, not getting any more work");
00254          return RunnableRef();
00255       }
00256 
00257       RunnableRef work = m_queue.front();
00258       m_queue.pop_front();
00259 
00260       // handle threads waiting in addWork().
00261       if (!queueIsFull())
00262       {
00263          m_queueNotFull.notifyAll();
00264       }
00265 
00266       // handle waiting shutdown thread or callers of waitForEmptyQueue()
00267       if (m_queue.size() == 0)
00268       {
00269          m_queueEmpty.notifyAll();
00270       }
00271       BLOCXX_POOL_LOG_DEBUG(m_logger, "A thread got some work to do");
00272       return work;
00273    }
00274 
00275    // pool characteristics
00276    UInt32 m_maxQueueSize;
00277    // pool state
00278    Array<ThreadRef> m_threads;
00279    std::deque<RunnableRef> m_queue;
00280    bool m_queueClosed;
00281    bool m_shutdown;
00282    // pool synchronization
00283    NonRecursiveMutex m_queueLock;
00284    Condition m_queueNotFull;
00285    Condition m_queueEmpty;
00286    Condition m_queueNotEmpty;
00287    LoggerRef m_logger;
00288    String m_poolName;
00289 };
00290 class FixedSizePoolImpl : public CommonPoolImpl
00291 {
00292 public:
00293    FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00294       : CommonPoolImpl(maxQueueSize, logger, poolName)
00295    {
00296       // create the threads and start them up.
00297       m_threads.reserve(numThreads);
00298       for (UInt32 i = 0; i < numThreads; ++i)
00299       {
00300          m_threads.push_back(ThreadRef(new FixedSizePoolWorkerThread(this)));
00301       }
00302       for (UInt32 i = 0; i < numThreads; ++i)
00303       {
00304          m_threads[i]->start();
00305       }
00306       BLOCXX_POOL_LOG_DEBUG(m_logger, "Threads are started and ready to go");
00307    }
00308    // returns true if work is placed in the queue to be run and false if not.
00309    virtual bool addWork(const RunnableRef& work, bool blockWhenFull)
00310    {
00311       // check precondition: work != NULL
00312       if (!work)
00313       {
00314          BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00315          return false;
00316       }
00317       NonRecursiveMutexLock l(m_queueLock);
00318       if (!blockWhenFull && queueIsFull())
00319       {
00320          BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is full. Not adding work and returning false");
00321          return false;
00322       }
00323       while ( queueIsFull() && !queueClosed() )
00324       {
00325          BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00326          m_queueNotFull.wait(l);
00327       }
00328       // the pool is in the process of being destroyed
00329       if (queueClosed())
00330       {
00331          BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00332          return false;
00333       }
00334       m_queue.push_back(work);
00335       
00336       // if the queue was empty, there may be workers just sitting around, so we need to wake them up!
00337       if (m_queue.size() == 1)
00338       {
00339          BLOCXX_POOL_LOG_DEBUG(m_logger, "Waking up sleepy workers");
00340          m_queueNotEmpty.notifyAll();
00341       }
00342 
00343       BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00344       return true;
00345    }
00346 
00347    // we keep this around so it can be called in the destructor
00348    virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00349    {
00350       shutdownThreads(finishWorkInQueue, shutdownSecs);
00351    }
00352    virtual ~FixedSizePoolImpl()
00353    {
00354       // can't let exception escape the destructor
00355       try
00356       {
00357          // don't need a lock here, because we're the only thread left.
00358          if (!queueClosed())
00359          {
00360             // Make sure the pool is shutdown.
00361             // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor.
00362             this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 1);
00363          }
00364       }
00365       catch (...)
00366       {
00367       }
00368    }
00369 private:
00370    friend class FixedSizePoolWorkerThread;
00371 };
00372 void runRunnable(const RunnableRef& work)
00373 {
00374    // don't let exceptions escape, we need to keep going, except for ThreadCancelledException, in which case we need to stop.
00375    try
00376    {
00377       work->run();
00378    }
00379    catch (ThreadCancelledException&)
00380    {
00381       throw;
00382    }
00383    catch (Exception& ex)
00384    {
00385 #ifdef BLOCXX_DEBUG     
00386       std::cerr << "!!! Exception: " << ex.type() << " caught in ThreadPool worker: " << ex << std::endl;
00387 #endif
00388    }
00389    catch(std::exception& ex)
00390    {
00391 #ifdef BLOCXX_DEBUG
00392       std::cerr << "!!! std::exception what = " << ex.what() << std::endl;
00393 #endif
00394    }
00395    catch (...)
00396    {
00397 #ifdef BLOCXX_DEBUG     
00398       std::cerr << "!!! Unknown Exception caught in ThreadPool worker" << std::endl;
00399 #endif
00400    }
00401 }
00402 Int32 FixedSizePoolWorkerThread::run()
00403 {
00404    while (true)
00405    {
00406       // check queue for work
00407       RunnableRef work = m_thePool->getWorkFromQueue(true);
00408       if (!work)
00409       {
00410          return 0;
00411       }
00412       // save this off so it can be cancelled by another thread.
00413       {
00414          MutexLock lock(m_guard);
00415          m_currentRunnable = work;
00416       }
00417       runRunnable(work);
00418       {
00419          MutexLock lock(m_guard);
00420          m_currentRunnable = 0;
00421       }
00422    }
00423    return 0;
00424 }
00425 class DynamicSizePoolImpl;
00427 class DynamicSizePoolWorkerThread : public Thread
00428 {
00429 public:
00430    DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
00431       : Thread()
00432       , m_thePool(thePool)
00433    {
00434    }
00435    virtual Int32 run();
00436 private:
00437    virtual void doCooperativeCancel()
00438    {
00439       MutexLock lock(m_guard);
00440       if (m_currentRunnable)
00441       {
00442          m_currentRunnable->doCooperativeCancel();
00443       }
00444    }
00445    virtual void doDefinitiveCancel()
00446    {
00447       MutexLock lock(m_guard);
00448       if (m_currentRunnable)
00449       {
00450          m_currentRunnable->doCooperativeCancel();
00451       }
00452    }
00453 
00454    DynamicSizePoolImpl* m_thePool;
00455 
00456    Mutex m_guard;
00457    RunnableRef m_currentRunnable;
00458 
00459    // non-copyable
00460    DynamicSizePoolWorkerThread(const DynamicSizePoolWorkerThread&);
00461    DynamicSizePoolWorkerThread& operator=(const DynamicSizePoolWorkerThread&);
00462 };
00464 class DynamicSizePoolImpl : public CommonPoolImpl
00465 {
00466 public:
00467    DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00468       : CommonPoolImpl(maxQueueSize, logger, poolName)
00469       , m_maxThreads(maxThreads)
00470    {
00471    }
00472    // returns true if work is placed in the queue to be run and false if not.
00473    virtual bool addWork(const RunnableRef& work, bool blockWhenFull)
00474    {
00475       // check precondition: work != NULL
00476       if (!work)
00477       {
00478          BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00479          return false;
00480       }
00481       NonRecursiveMutexLock l(m_queueLock);
00482 
00483       // the pool is in the process of being destroyed
00484       if (queueClosed())
00485       {
00486          BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00487          return false;
00488       }
00489 
00490       // Can't touch m_threads until *after* we check for the queue being closed, shutdown 
00491       // requires that m_threads not change after the queue is closed.
00492       // Now clean up dead threads (before we add the new one, so we don't need to check it)
00493       size_t i = 0;
00494       while (i < m_threads.size())
00495       {
00496          if (!m_threads[i]->isRunning())
00497          {
00498             BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Thread %1 is finished. Cleaning up it's remains.", i));
00499             m_threads[i]->join();
00500             m_threads.remove(i);
00501          }
00502          else
00503          {
00504             ++i;
00505          }
00506       }
00507 
00508       if (!blockWhenFull && queueIsFull())
00509       {
00510          BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is full. Not adding work and returning false");
00511          return false;
00512       }
00513       while ( queueIsFull() && !queueClosed() )
00514       {
00515          BLOCXX_POOL_LOG_DEBUG(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00516          m_queueNotFull.wait(l);
00517       }
00518       
00519       m_queue.push_back(work);
00520       
00521       BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00522 
00523       // Release the lock and wake up a thread waiting for work in the queue
00524       // This bit of code is a race condition with the thread,
00525       // but if we acquire the lock again before it does, then we
00526       // properly handle that case.  The only disadvantage if we win
00527       // the "race" is that we'll unnecessarily start a new thread.
00528       // In practice it works all the time.
00529       l.release();
00530       m_queueNotEmpty.notifyOne();
00531       Thread::yield(); // give the thread a chance to run
00532       l.lock();
00533 
00534       // Start up a new thread to handle the work in the queue.
00535       if (!m_queue.empty() && m_threads.size() < m_maxThreads)
00536       {
00537          ThreadRef theThread(new DynamicSizePoolWorkerThread(this));
00538          m_threads.push_back(theThread);
00539          BLOCXX_POOL_LOG_DEBUG(m_logger, "About to start a new thread");
00540          theThread->start();
00541          BLOCXX_POOL_LOG_DEBUG(m_logger, "New thread started");
00542       }
00543       return true;
00544    }
00545 
00546    // we keep this around so it can be called in the destructor
00547    virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00548    {
00549       shutdownThreads(finishWorkInQueue, shutdownSecs);
00550    }
00551    virtual ~DynamicSizePoolImpl()
00552    {
00553       // can't let exception escape the destructor
00554       try
00555       {
00556          // don't need a lock here, because we're the only thread left.
00557          if (!queueClosed())
00558          {
00559             // Make sure the pool is shutdown.
00560             // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor.
00561             this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 1);
00562          }
00563       }
00564       catch (...)
00565       {
00566       }
00567    }
00568 protected:
00569    UInt32 getMaxThreads() const
00570    {
00571       return m_maxThreads;
00572    }
00573 
00574 private:
00575    // pool characteristics
00576    UInt32 m_maxThreads;
00577    friend class DynamicSizePoolWorkerThread;
00578 };
00579 Int32 DynamicSizePoolWorkerThread::run()
00580 {
00581    while (true)
00582    {
00583       // check queue for work
00584       RunnableRef work = m_thePool->getWorkFromQueue(false);
00585       if (!work)
00586       {
00587          return 0;
00588       }
00589       // save this off so it can be cancelled by another thread.
00590       {
00591          MutexLock lock(m_guard);
00592          m_currentRunnable = work;
00593       }
00594       runRunnable(work);
00595       {
00596          MutexLock lock(m_guard);
00597          m_currentRunnable = 0;
00598       }
00599    }
00600    return 0;
00601 }
00602 
00604 class DynamicSizeNoQueuePoolImpl : public DynamicSizePoolImpl
00605 {
00606 public:
00607    DynamicSizeNoQueuePoolImpl(UInt32 maxThreads, const LoggerRef& logger, const String& poolName)
00608       : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName) // allow queue in superclass, but prevent it from having any backlog
00609    {
00610    }
00611 
00612    virtual ~DynamicSizeNoQueuePoolImpl()
00613    {
00614    }
00615 
00616    // the only difference between this class and DynamicSizePoolImpl is that we change the definition of queueIsFull()
00617    virtual bool queueIsFull() const
00618    {
00619       // don't let the queue get bigger than the number of free threads. This effectively prevents work from being 
00620       // queued up which can't be immediately serviced.
00621       size_t freeThreads = getMaxThreads() -  m_threads.size(); 
00622       return (freeThreads <= m_queue.size());
00623    }
00624 
00625 };
00626 
00627 } // end anonymous namespace
00629 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const LoggerRef& logger_, const String& poolName)
00630 {
00631    LoggerRef logger(logger_);
00632    if (!logger)
00633    {
00634       logger = LoggerRef(new NullLogger());
00635    }
00636    switch (poolType)
00637    {
00638       case FIXED_SIZE:
00639          m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00640          break;
00641       case DYNAMIC_SIZE:
00642          m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00643          break;
00644       case DYNAMIC_SIZE_NO_QUEUE:
00645          m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
00646          break;
00647    }
00648 }
00650 bool ThreadPool::addWork(const RunnableRef& work)
00651 {
00652    return m_impl->addWork(work, true);
00653 }
00655 bool ThreadPool::tryAddWork(const RunnableRef& work)
00656 {
00657    return m_impl->addWork(work, false);
00658 }
00660 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00661 {
00662    m_impl->shutdown(finishWorkInQueue, shutdownSecs);
00663 }
00665 void ThreadPool::waitForEmptyQueue()
00666 {
00667    m_impl->waitForEmptyQueue();
00668 }
00670 ThreadPool::~ThreadPool()
00671 {
00672 }
00674 ThreadPool::ThreadPool(const ThreadPool& x)
00675    : IntrusiveCountableBase(x)
00676    , m_impl(x.m_impl)
00677 {
00678 }
00680 ThreadPool& ThreadPool::operator=(const ThreadPool& x)
00681 {
00682    m_impl = x.m_impl;
00683    return *this;
00684 }
00685 
00686 } // end namespace BLOCXX_NAMESPACE
00687 

Generated on Fri Jun 16 15:39:09 2006 for blocxx by  doxygen 1.4.6