00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00036 #include "blocxx/BLOCXX_config.h"
00037 #include "blocxx/Condition.hpp"
00038 #include "blocxx/NonRecursiveMutexLock.hpp"
00039 #include "blocxx/ExceptionIds.hpp"
00040
00041 #include <cassert>
00042 #include <cerrno>
00043 #ifdef BLOCXX_HAVE_SYS_TIME_H
00044 #include <sys/time.h>
00045 #endif
00046
00047 namespace BLOCXX_NAMESPACE
00048 {
00049
00050 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionLock);
00051 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionResource);
00052 #if defined(BLOCXX_USE_PTHREAD)
00053
00054 Condition::Condition()
00055 {
00056 int res = pthread_cond_init(&m_condition, 0);
00057 if (res != 0)
00058 {
00059 BLOCXX_THROW(ConditionResourceException, "Failed initializing condition variable");
00060 }
00061 }
00063 Condition::~Condition()
00064 {
00065 int res = pthread_cond_destroy(&m_condition);
00066 assert(res == 0);
00067 }
00069 void
00070 Condition::notifyOne()
00071 {
00072 int res = pthread_cond_signal(&m_condition);
00073 assert(res == 0);
00074 }
00076 void
00077 Condition::notifyAll()
00078 {
00079 int res = pthread_cond_broadcast(&m_condition);
00080 assert(res == 0);
00081 }
00083 void
00084 Condition::doWait(NonRecursiveMutex& mutex)
00085 {
00086 int res;
00087 NonRecursiveMutexLockState state;
00088 mutex.conditionPreWait(state);
00089 res = pthread_cond_wait(&m_condition, state.pmutex);
00090 mutex.conditionPostWait(state);
00091 assert(res == 0);
00092 }
00094 bool
00095 Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
00096 {
00097 int res;
00098 NonRecursiveMutexLockState state;
00099 mutex.conditionPreWait(state);
00100 bool ret = false;
00101 timespec ts;
00102 struct timeval now;
00103 ::gettimeofday(&now, NULL);
00104
00105 ts.tv_sec = now.tv_sec + sTimeout;
00106
00107 const int NANOSECONDS_PER_MICROSECOND = 1000;
00108 const int NANOSECONDS_PER_SECOND = 1000000000;
00109 int nsec = (now.tv_usec + usTimeout) * NANOSECONDS_PER_MICROSECOND;
00110 ts.tv_sec += nsec / NANOSECONDS_PER_SECOND;
00111 ts.tv_nsec = nsec % NANOSECONDS_PER_SECOND;
00112
00113 res = pthread_cond_timedwait(&m_condition, state.pmutex, &ts);
00114 mutex.conditionPostWait(state);
00115 assert(res == 0 || res == ETIMEDOUT);
00116 ret = res != ETIMEDOUT;
00117 return ret;
00118 }
00119 #elif defined (BLOCXX_WIN32)
00120
00121 Condition::Condition()
00122 : m_condition(new ConditionInfo_t)
00123 {
00124 m_condition->waitersCount = 0;
00125 m_condition->wasBroadcast = false;
00126 m_condition->queue = ::CreateSemaphore(
00127 NULL,
00128 0,
00129 0x7fffffff,
00130 NULL);
00131 ::InitializeCriticalSection(&m_condition->waitersCountLock);
00132 m_condition->waitersDone = ::CreateEvent(
00133 NULL,
00134 false,
00135 false,
00136 NULL);
00137 }
00139 Condition::~Condition()
00140 {
00141 ::CloseHandle(m_condition->queue);
00142 ::DeleteCriticalSection(&m_condition->waitersCountLock);
00143 ::CloseHandle(m_condition->waitersDone);
00144 delete m_condition;
00145 }
00147 void
00148 Condition::notifyOne()
00149 {
00150 ::EnterCriticalSection(&m_condition->waitersCountLock);
00151 bool haveWaiters = m_condition->waitersCount > 0;
00152 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00153
00154
00155 if (haveWaiters)
00156 {
00157 ::ReleaseSemaphore(m_condition->queue, 1, 0);
00158 }
00159 }
00161 void
00162 Condition::notifyAll()
00163 {
00164 ::EnterCriticalSection(&m_condition->waitersCountLock);
00165 bool haveWaiters = false;
00166 if (m_condition->waitersCount > 0)
00167 {
00168
00169 haveWaiters = m_condition->wasBroadcast = true;
00170 }
00171
00172 if (haveWaiters)
00173 {
00174
00175 ::ReleaseSemaphore(m_condition->queue, m_condition->waitersCount, 0);
00176 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00177
00178
00179 ::WaitForSingleObject(m_condition->waitersDone, INFINITE);
00180 m_condition->wasBroadcast = false;
00181 }
00182 else
00183 {
00184 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00185 }
00186 }
00188 void
00189 Condition::doWait(NonRecursiveMutex& mutex)
00190 {
00191 doTimedWait(mutex, INFINITE, 0);
00192 }
00194 bool
00195 Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
00196 {
00197 bool cc = true;
00198 NonRecursiveMutexLockState state;
00199 mutex.conditionPreWait(state);
00200
00201 ::EnterCriticalSection(&m_condition->waitersCountLock);
00202 m_condition->waitersCount++;
00203 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00204
00205
00206 if (sTimeout != INFINITE)
00207 {
00208 sTimeout *= 1000;
00209 sTimeout += usTimeout / 1000;
00210 }
00211
00212
00213
00214 if (::SignalObjectAndWait(mutex.m_mutex, m_condition->queue, sTimeout,
00215 false) == WAIT_TIMEOUT)
00216 {
00217 cc = false;
00218 }
00219
00220 ::EnterCriticalSection(&m_condition->waitersCountLock);
00221 m_condition->waitersCount--;
00222
00223
00224 bool isLastWaiter = (m_condition->wasBroadcast && m_condition->waitersCount == 0
00225 && cc == true);
00226
00227 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00228
00229
00230
00231 if (isLastWaiter)
00232 {
00233
00234
00235 ::SignalObjectAndWait(m_condition->waitersDone, mutex.m_mutex,
00236 INFINITE, false);
00237 }
00238 else
00239 {
00240
00241 ::WaitForSingleObject(mutex.m_mutex, INFINITE);
00242 }
00243 mutex.conditionPostWait(state);
00244 return cc;
00245 }
00246 #else
00247 #error "port me!"
00248 #endif
00249
00250 void
00251 Condition::wait(NonRecursiveMutexLock& lock)
00252 {
00253 if (!lock.isLocked())
00254 {
00255 BLOCXX_THROW(ConditionLockException, "Lock must be locked");
00256 }
00257 doWait(*(lock.m_mutex));
00258 }
00260 bool
00261 Condition::timedWait(NonRecursiveMutexLock& lock, UInt32 sTimeout, UInt32 usTimeout)
00262 {
00263 if (!lock.isLocked())
00264 {
00265 BLOCXX_THROW(ConditionLockException, "Lock must be locked");
00266 }
00267 return doTimedWait(*(lock.m_mutex), sTimeout, usTimeout);
00268 }
00269
00270 }
00271