Condition.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2004 Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2005 Novell, Inc. All rights reserved.
00004 *
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 *
00008 *  - Redistributions of source code must retain the above copyright notice,
00009 *    this list of conditions and the following disclaimer.
00010 *
00011 *  - Redistributions in binary form must reproduce the above copyright notice,
00012 *    this list of conditions and the following disclaimer in the documentation
00013 *    and/or other materials provided with the distribution.
00014 *
00015 *  - Neither the name of Vintela, Inc., Novell, Inc., nor the names of its
00016 *    contributors may be used to endorse or promote products derived from this
00017 *    software without specific prior written permission.
00018 *
00019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00020 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00021 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00022 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc., Novell, Inc., OR THE 
00023 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
00024 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
00025 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 
00026 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
00027 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
00028 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
00029 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 *******************************************************************************/
00031 
00032 
00036 #include "blocxx/BLOCXX_config.h"
00037 #include "blocxx/Condition.hpp"
00038 #include "blocxx/NonRecursiveMutexLock.hpp"
00039 #include "blocxx/ExceptionIds.hpp"
00040 
00041 #include <cassert>
00042 #include <cerrno>
00043 #ifdef BLOCXX_HAVE_SYS_TIME_H
00044 #include <sys/time.h>
00045 #endif
00046 
00047 namespace BLOCXX_NAMESPACE
00048 {
00049 
00050 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionLock);
00051 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionResource);
00052 #if defined(BLOCXX_USE_PTHREAD)
00053 
00054 Condition::Condition()
00055 {
00056    int res = pthread_cond_init(&m_condition, 0);
00057    if (res != 0)
00058    {
00059       BLOCXX_THROW(ConditionResourceException, "Failed initializing condition variable");
00060    }
00061 }
00063 Condition::~Condition()
00064 {
00065    int res = pthread_cond_destroy(&m_condition);
00066    assert(res == 0);
00067 }
00069 void
00070 Condition::notifyOne()
00071 {
00072    int res = pthread_cond_signal(&m_condition);
00073    assert(res == 0);
00074 }
00076 void
00077 Condition::notifyAll()
00078 {
00079    int res = pthread_cond_broadcast(&m_condition);
00080    assert(res == 0);
00081 }
00083 void
00084 Condition::doWait(NonRecursiveMutex& mutex)
00085 {
00086    int res;
00087    NonRecursiveMutexLockState state;
00088    mutex.conditionPreWait(state);
00089    res = pthread_cond_wait(&m_condition, state.pmutex);
00090    mutex.conditionPostWait(state);
00091    assert(res == 0);
00092 }
00094 bool
00095 Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
00096 {
00097    int res;
00098    NonRecursiveMutexLockState state;
00099    mutex.conditionPreWait(state);
00100    bool ret = false;
00101    timespec ts;
00102    struct timeval now;
00103    ::gettimeofday(&now, NULL);
00104    
00105    ts.tv_sec = now.tv_sec + sTimeout;
00106 
00107    const int NANOSECONDS_PER_MICROSECOND = 1000;
00108    const int NANOSECONDS_PER_SECOND = 1000000000;
00109    int nsec = (now.tv_usec + usTimeout) * NANOSECONDS_PER_MICROSECOND;
00110    ts.tv_sec += nsec / NANOSECONDS_PER_SECOND;
00111    ts.tv_nsec = nsec % NANOSECONDS_PER_SECOND;
00112 
00113    res = pthread_cond_timedwait(&m_condition, state.pmutex, &ts);
00114    mutex.conditionPostWait(state);
00115    assert(res == 0 || res == ETIMEDOUT);
00116    ret = res != ETIMEDOUT;
00117    return ret;
00118 }
00119 #elif defined (BLOCXX_WIN32)
00120 
00121 Condition::Condition()
00122    : m_condition(new ConditionInfo_t)
00123 {
00124    m_condition->waitersCount = 0;
00125    m_condition->wasBroadcast = false;
00126    m_condition->queue = ::CreateSemaphore(
00127       NULL,    // No security
00128       0,       // initially 0
00129       0x7fffffff, // max count
00130       NULL);      // Unnamed
00131    ::InitializeCriticalSection(&m_condition->waitersCountLock);
00132    m_condition->waitersDone = ::CreateEvent(
00133       NULL,    // No security
00134       false,      // auto-reset
00135       false,      // non-signaled initially
00136       NULL);      // Unnamed
00137 }
00139 Condition::~Condition()
00140 {
00141    ::CloseHandle(m_condition->queue);
00142    ::DeleteCriticalSection(&m_condition->waitersCountLock);
00143    ::CloseHandle(m_condition->waitersDone);
00144    delete m_condition;
00145 }
00147 void
00148 Condition::notifyOne()
00149 {
00150    ::EnterCriticalSection(&m_condition->waitersCountLock);
00151    bool haveWaiters = m_condition->waitersCount > 0;
00152    ::LeaveCriticalSection(&m_condition->waitersCountLock);
00153 
00154    // If no threads waiting, then this is a no-op
00155    if (haveWaiters)
00156    {
00157       ::ReleaseSemaphore(m_condition->queue, 1, 0);
00158    }
00159 }
00161 void
00162 Condition::notifyAll()
00163 {
00164    ::EnterCriticalSection(&m_condition->waitersCountLock);
00165    bool haveWaiters = false;
00166    if (m_condition->waitersCount > 0)
00167    {
00168       // It's gonna be a broadcast, even if there's only one waiting thread.
00169       haveWaiters = m_condition->wasBroadcast = true;
00170    }
00171 
00172    if (haveWaiters)
00173    {
00174       // Wake up all the waiting threads atomically
00175       ::ReleaseSemaphore(m_condition->queue, m_condition->waitersCount, 0);
00176       ::LeaveCriticalSection(&m_condition->waitersCountLock);
00177 
00178       // Wait for all the threads to acquire the counting semaphore
00179       ::WaitForSingleObject(m_condition->waitersDone, INFINITE);
00180       m_condition->wasBroadcast = false;
00181    }
00182    else
00183    {
00184       ::LeaveCriticalSection(&m_condition->waitersCountLock);
00185    }
00186 }
00188 void
00189 Condition::doWait(NonRecursiveMutex& mutex)
00190 {
00191    doTimedWait(mutex, INFINITE, 0);
00192 }
00194 bool
00195 Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
00196 {
00197    bool cc = true;
00198    NonRecursiveMutexLockState state;
00199    mutex.conditionPreWait(state);
00200 
00201    ::EnterCriticalSection(&m_condition->waitersCountLock);
00202    m_condition->waitersCount++;
00203    ::LeaveCriticalSection(&m_condition->waitersCountLock);
00204 
00205    // Calc timeout if specified
00206    if (sTimeout != INFINITE)
00207    {
00208       sTimeout *= 1000;    // Convert to ms
00209       sTimeout += usTimeout / 1000;    // Convert micro seconds to ms and add
00210    }
00211 
00212    // Atomically release the mutex and wait on the
00213    // queue until signal/broadcast.
00214    if (::SignalObjectAndWait(mutex.m_mutex, m_condition->queue, sTimeout,
00215       false) == WAIT_TIMEOUT)
00216    {
00217       cc = false;
00218    }
00219 
00220    ::EnterCriticalSection(&m_condition->waitersCountLock);
00221    m_condition->waitersCount--;
00222 
00223    // Check to see if we're the last waiter after the broadcast
00224    bool isLastWaiter = (m_condition->wasBroadcast && m_condition->waitersCount == 0
00225       && cc == true);
00226 
00227    ::LeaveCriticalSection(&m_condition->waitersCountLock);
00228 
00229    // If this is the last thread waiting for this broadcast, then let all the
00230    // other threads proceed.
00231    if (isLastWaiter)
00232    {
00233       // Atomically signal the waitersDone event and wait to acquire
00234       // the external mutex. Enusres fairness
00235       ::SignalObjectAndWait(m_condition->waitersDone, mutex.m_mutex,
00236          INFINITE, false);
00237    }
00238    else
00239    {
00240       // Re-gain ownership of the external mutex
00241       ::WaitForSingleObject(mutex.m_mutex, INFINITE);
00242    }
00243    mutex.conditionPostWait(state);
00244    return cc;
00245 }
00246 #else
00247 #error "port me!"
00248 #endif
00249 
00250 void
00251 Condition::wait(NonRecursiveMutexLock& lock)
00252 {
00253    if (!lock.isLocked())
00254    {
00255       BLOCXX_THROW(ConditionLockException, "Lock must be locked");
00256    }
00257    doWait(*(lock.m_mutex));
00258 }
00260 bool
00261 Condition::timedWait(NonRecursiveMutexLock& lock, UInt32 sTimeout, UInt32 usTimeout)
00262 {
00263    if (!lock.isLocked())
00264    {
00265       BLOCXX_THROW(ConditionLockException, "Lock must be locked");
00266    }
00267    return doTimedWait(*(lock.m_mutex), sTimeout, usTimeout);
00268 }
00269 
00270 } // end namespace BLOCXX_NAMESPACE
00271 

Generated on Fri Jun 16 15:39:08 2006 for blocxx by  doxygen 1.4.6