Rewrite IThreader implementation around new synchronization primitives (bug 5862, r=fyren).

This commit is contained in:
David Anderson 2013-08-19 20:58:02 -07:00
parent 582162460f
commit 45856816c1
15 changed files with 212 additions and 542 deletions

View File

@ -163,6 +163,7 @@ class SM:
self.compiler.AddToListVar('CXXFLAGS', '-fno-threadsafe-statics') self.compiler.AddToListVar('CXXFLAGS', '-fno-threadsafe-statics')
self.compiler.AddToListVar('CXXFLAGS', '-Wno-non-virtual-dtor') self.compiler.AddToListVar('CXXFLAGS', '-Wno-non-virtual-dtor')
self.compiler.AddToListVar('CXXFLAGS', '-Wno-overloaded-virtual') self.compiler.AddToListVar('CXXFLAGS', '-Wno-overloaded-virtual')
self.compiler.AddToListVar('CXXFLAGS', '-Wno-narrowing')
if (self.vendor == 'gcc' and cxx.majorVersion >= 4 and cxx.minorVersion >= 7) or \ if (self.vendor == 'gcc' and cxx.majorVersion >= 4 and cxx.minorVersion >= 7) or \
(self.vendor == 'clang' and cxx.majorVersion >= 3): (self.vendor == 'clang' and cxx.majorVersion >= 3):
self.compiler.AddToListVar('CXXFLAGS', '-Wno-delete-non-virtual-dtor') self.compiler.AddToListVar('CXXFLAGS', '-Wno-delete-non-virtual-dtor')

View File

@ -638,7 +638,7 @@ QueryCvarCookie_t ConVarManager::QueryClientConVar(edict_t *pPlayer, const char
return InvalidQueryCvarCookie; return InvalidQueryCvarCookie;
} }
ConVarQuery query = {cookie, pCallback, hndl}; ConVarQuery query = {cookie, pCallback, (cell_t)hndl};
m_ConVarQueries.push_back(query); m_ConVarQueries.push_back(query);
#endif #endif

View File

@ -480,7 +480,7 @@ void CRadioMenuPlayer::Radio_Init(int keys, const char *title, const char *text)
void CRadioMenuPlayer::Radio_Refresh() void CRadioMenuPlayer::Radio_Refresh()
{ {
#if SOURCE_ENGINE != SE_DOTA #if SOURCE_ENGINE != SE_DOTA
cell_t players[1] = {m_index}; cell_t players[1] = { (cell_t)m_index };
char *ptr = display_pkt; char *ptr = display_pkt;
char save = 0; char save = 0;
size_t len = display_len; size_t len = display_len;

View File

@ -12,6 +12,7 @@ compiler['CDEFINES'].append('SM_LOGIC')
if AMBuild.target['platform'] == 'linux': if AMBuild.target['platform'] == 'linux':
compiler['POSTLINKFLAGS'].append('-lpthread') compiler['POSTLINKFLAGS'].append('-lpthread')
compiler['POSTLINKFLAGS'].append('-lrt')
if AMBuild.target['platform'] == 'darwin': if AMBuild.target['platform'] == 'darwin':
compiler['CFLAGS'].extend(['-Wno-deprecated-declarations']) compiler['CFLAGS'].extend(['-Wno-deprecated-declarations'])
compiler['POSTLINKFLAGS'].extend(['-framework', 'CoreServices']) compiler['POSTLINKFLAGS'].extend(['-framework', 'CoreServices'])

View File

@ -33,9 +33,48 @@
#define _INCLUDE_SOURCEMOD_THREAD_SUPPORT_H #define _INCLUDE_SOURCEMOD_THREAD_SUPPORT_H
#include <IThreader.h> #include <IThreader.h>
#include <ke_thread_utils.h>
#include <ke_utility.h>
using namespace SourceMod; using namespace SourceMod;
class CompatMutex : public IMutex
{
public:
bool TryLock() {
return mutex_.TryLock();
}
void Lock() {
mutex_.Lock();
}
void Unlock() {
mutex_.Unlock();
}
void DestroyThis() {
delete this;
}
private:
ke::Mutex mutex_;
};
class CompatCondVar : public IEventSignal
{
public:
void Wait() {
ke::AutoLock lock(&cv_);
cv_.Wait();
}
void Signal() {
ke::AutoLock lock(&cv_);
cv_.Notify();
}
void DestroyThis() {
delete this;
}
private:
ke::ConditionVariable cv_;
};
extern IThreader *g_pThreader; extern IThreader *g_pThreader;
#endif //_INCLUDE_SOURCEMOD_THREAD_SUPPORT_H #endif //_INCLUDE_SOURCEMOD_THREAD_SUPPORT_H

View File

@ -1,5 +1,5 @@
/** /**
* vim: set ts=4 : * vim: set ts=4 sw=4 tw=99 noet:
* ============================================================================= * =============================================================================
* SourceMod * SourceMod
* Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved. * Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved.
@ -61,17 +61,9 @@ void PosixThreader::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min)
IMutex *PosixThreader::MakeMutex() IMutex *PosixThreader::MakeMutex()
{ {
pthread_mutex_t mutex; return new CompatMutex();
if (pthread_mutex_init(&mutex, NULL) != 0)
return NULL;
PosixMutex *pMutex = new PosixMutex(mutex);
return pMutex;
} }
void PosixThreader::MakeThread(IThread *pThread) void PosixThreader::MakeThread(IThread *pThread)
{ {
ThreadParams defparams; ThreadParams defparams;
@ -92,30 +84,21 @@ IThreadHandle *PosixThreader::MakeThread(IThread *pThread, ThreadFlags flags)
return MakeThread(pThread, &defparams); return MakeThread(pThread, &defparams);
} }
void *Posix_ThreadGate(void *param) void PosixThreader::ThreadHandle::Run()
{ {
PosixThreader::ThreadHandle *pHandle = // Wait for an unpause if necessary.
reinterpret_cast<PosixThreader::ThreadHandle *>(param); {
ke::AutoLock lock(&m_runlock);
if (m_state == Thread_Paused)
m_runlock.Wait();
}
//Block this thread from being started initially. m_run->RunThread(this);
pthread_mutex_lock(&pHandle->m_runlock); m_state = Thread_Done;
//if we get here, we've obtained the lock and are allowed to run. m_run->OnTerminate(this, false);
//unlock and continue.
pthread_mutex_unlock(&pHandle->m_runlock);
pHandle->m_run->RunThread(pHandle); if (m_params.flags & Thread_AutoRelease)
delete this;
ThreadParams params;
pthread_mutex_lock(&pHandle->m_statelock);
pHandle->m_state = Thread_Done;
pHandle->GetParams(&params);
pthread_mutex_unlock(&pHandle->m_statelock);
pHandle->m_run->OnTerminate(pHandle, false);
if (params.flags & Thread_AutoRelease)
delete pHandle;
return 0;
} }
ThreadParams g_defparams; ThreadParams g_defparams;
@ -124,68 +107,21 @@ IThreadHandle *PosixThreader::MakeThread(IThread *pThread, const ThreadParams *p
if (params == NULL) if (params == NULL)
params = &g_defparams; params = &g_defparams;
PosixThreader::ThreadHandle *pHandle = ke::AutoPtr<ThreadHandle> pHandle(new ThreadHandle(this, pThread, params));
new PosixThreader::ThreadHandle(this, pThread, params);
pthread_mutex_lock(&pHandle->m_runlock); pHandle->m_thread = new ke::Thread(pHandle, "SourceMod");
if (!pHandle->m_thread->Succeeded())
int err;
err = pthread_create(&pHandle->m_thread, NULL, Posix_ThreadGate, (void *)pHandle);
if (err != 0)
{
pthread_mutex_unlock(&pHandle->m_runlock);
delete pHandle;
return NULL; return NULL;
}
//Don't bother setting priority... if (!(params->flags & Thread_CreateSuspended))
pHandle->Unpause();
if (!(pHandle->m_params.flags & Thread_CreateSuspended)) return pHandle.take();
{
pHandle->m_state = Thread_Running;
err = pthread_mutex_unlock(&pHandle->m_runlock);
if (err != 0)
pHandle->m_state = Thread_Paused;
}
return pHandle;
} }
IEventSignal *PosixThreader::MakeEventSignal() IEventSignal *PosixThreader::MakeEventSignal()
{ {
return new PosixEventSignal(); return new CompatCondVar();
}
/*****************
**** Mutexes ****
*****************/
PosixThreader::PosixMutex::~PosixMutex()
{
pthread_mutex_destroy(&m_mutex);
}
bool PosixThreader::PosixMutex::TryLock()
{
int err = pthread_mutex_trylock(&m_mutex);
return (err == 0);
}
void PosixThreader::PosixMutex::Lock()
{
pthread_mutex_lock(&m_mutex);
}
void PosixThreader::PosixMutex::Unlock()
{
pthread_mutex_unlock(&m_mutex);
}
void PosixThreader::PosixMutex::DestroyThis()
{
delete this;
} }
/****************** /******************
@ -195,35 +131,24 @@ void PosixThreader::PosixMutex::DestroyThis()
PosixThreader::ThreadHandle::ThreadHandle(IThreader *parent, IThread *run, const ThreadParams *params) : PosixThreader::ThreadHandle::ThreadHandle(IThreader *parent, IThread *run, const ThreadParams *params) :
m_parent(parent), m_params(*params), m_run(run), m_state(Thread_Paused) m_parent(parent), m_params(*params), m_run(run), m_state(Thread_Paused)
{ {
pthread_mutex_init(&m_runlock, NULL);
pthread_mutex_init(&m_statelock, NULL);
} }
PosixThreader::ThreadHandle::~ThreadHandle() PosixThreader::ThreadHandle::~ThreadHandle()
{ {
pthread_mutex_destroy(&m_runlock);
pthread_mutex_destroy(&m_statelock);
} }
bool PosixThreader::ThreadHandle::WaitForThread() bool PosixThreader::ThreadHandle::WaitForThread()
{ {
void *arg; if (!m_thread)
if (pthread_join(m_thread, &arg) != 0)
return false; return false;
m_thread->Join();
return true; return true;
} }
ThreadState PosixThreader::ThreadHandle::GetState() ThreadState PosixThreader::ThreadHandle::GetState()
{ {
ThreadState state; return m_state;
pthread_mutex_lock(&m_statelock);
state = m_state;
pthread_mutex_unlock(&m_statelock);
return state;
} }
IThreadCreator *PosixThreader::ThreadHandle::Parent() IThreadCreator *PosixThreader::ThreadHandle::Parent()
@ -262,48 +187,9 @@ bool PosixThreader::ThreadHandle::Unpause()
if (m_state != Thread_Paused) if (m_state != Thread_Paused)
return false; return false;
ke::AutoLock lock(&m_runlock);
m_state = Thread_Running; m_state = Thread_Running;
m_runlock.Notify();
if (pthread_mutex_unlock(&m_runlock) != 0)
{
m_state = Thread_Paused;
return false;
}
return true; return true;
} }
/*****************
* EVENT SIGNALS *
*****************/
PosixThreader::PosixEventSignal::PosixEventSignal()
{
pthread_cond_init(&m_cond, NULL);
pthread_mutex_init(&m_mutex, NULL);
}
PosixThreader::PosixEventSignal::~PosixEventSignal()
{
pthread_cond_destroy(&m_cond);
pthread_mutex_destroy(&m_mutex);
}
void PosixThreader::PosixEventSignal::Wait()
{
pthread_mutex_lock(&m_mutex);
pthread_cond_wait(&m_cond, &m_mutex);
pthread_mutex_unlock(&m_mutex);
}
void PosixThreader::PosixEventSignal::Signal()
{
pthread_mutex_lock(&m_mutex);
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
}
void PosixThreader::PosixEventSignal::DestroyThis()
{
delete this;
}

View File

@ -1,5 +1,5 @@
/** /**
* vim: set ts=4 : * vim: set ts=4 sw=4 tw=99 noet:
* ============================================================================= * =============================================================================
* SourceMod * SourceMod
* Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved. * Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved.
@ -33,19 +33,17 @@
#define _INCLUDE_POSIXTHREADS_H_ #define _INCLUDE_POSIXTHREADS_H_
#include <pthread.h> #include <pthread.h>
#include <ke_thread_utils.h>
#include "IThreader.h" #include "IThreader.h"
using namespace SourceMod; using namespace SourceMod;
void *Posix_ThreadGate(void *param);
class PosixThreader : public IThreader class PosixThreader : public IThreader
{ {
public: public:
class ThreadHandle : public IThreadHandle class ThreadHandle : public IThreadHandle, public ke::IRunnable
{ {
friend class PosixThreader; friend class PosixThreader;
friend void *Posix_ThreadGate(void *param);
public: public:
ThreadHandle(IThreader *parent, IThread *run, const ThreadParams *params); ThreadHandle(IThreader *parent, IThread *run, const ThreadParams *params);
virtual ~ThreadHandle(); virtual ~ThreadHandle();
@ -58,43 +56,16 @@ public:
virtual bool SetPriority(ThreadPriority prio); virtual bool SetPriority(ThreadPriority prio);
virtual ThreadState GetState(); virtual ThreadState GetState();
virtual bool Unpause(); virtual bool Unpause();
public:
void Run();
protected: protected:
IThreader *m_parent; //Parent handle IThreader *m_parent; //Parent handle
pthread_t m_thread; //Windows HANDLE
ThreadParams m_params; //Current Parameters ThreadParams m_params; //Current Parameters
IThread *m_run; //Runnable context IThread *m_run; //Runnable context
pthread_mutex_t m_statelock; ke::AutoPtr<ke::Thread> m_thread;
pthread_mutex_t m_runlock; ke::ConditionVariable m_runlock;
ThreadState m_state; //internal state ThreadState m_state; //internal state
}; };
class PosixMutex : public IMutex
{
public:
PosixMutex(pthread_mutex_t m) : m_mutex(m)
{
};
virtual ~PosixMutex();
public:
virtual bool TryLock();
virtual void Lock();
virtual void Unlock();
virtual void DestroyThis();
protected:
pthread_mutex_t m_mutex;
};
class PosixEventSignal : public IEventSignal
{
public:
PosixEventSignal();
virtual ~PosixEventSignal();
public:
virtual void Wait();
virtual void Signal();
virtual void DestroyThis();
protected:
pthread_cond_t m_cond;
pthread_mutex_t m_mutex;
};
public: public:
IMutex *MakeMutex(); IMutex *MakeMutex();
void MakeThread(IThread *pThread); void MakeThread(IThread *pThread);

View File

@ -33,10 +33,6 @@
ThreadWorker::ThreadWorker(IThreadWorkerCallbacks *hooks) : BaseWorker(hooks), ThreadWorker::ThreadWorker(IThreadWorkerCallbacks *hooks) : BaseWorker(hooks),
m_Threader(NULL), m_Threader(NULL),
m_QueueLock(NULL),
m_StateLock(NULL),
m_PauseSignal(NULL),
m_AddSignal(NULL),
me(NULL), me(NULL),
m_think_time(DEFAULT_THINK_TIME_MS) m_think_time(DEFAULT_THINK_TIME_MS)
{ {
@ -46,32 +42,19 @@ ThreadWorker::ThreadWorker(IThreadWorkerCallbacks *hooks) : BaseWorker(hooks),
ThreadWorker::ThreadWorker(IThreadWorkerCallbacks *hooks, IThreader *pThreader, unsigned int thinktime) : ThreadWorker::ThreadWorker(IThreadWorkerCallbacks *hooks, IThreader *pThreader, unsigned int thinktime) :
BaseWorker(hooks), BaseWorker(hooks),
m_Threader(pThreader), m_Threader(pThreader),
m_QueueLock(NULL),
m_StateLock(NULL),
m_PauseSignal(NULL),
m_AddSignal(NULL),
me(NULL), me(NULL),
m_think_time(thinktime) m_think_time(thinktime)
{ {
if (m_Threader) m_state = m_Threader ? Worker_Stopped : Worker_Invalid;
{
m_state = Worker_Stopped;
} else {
m_state = Worker_Invalid;
}
} }
ThreadWorker::~ThreadWorker() ThreadWorker::~ThreadWorker()
{ {
if (m_state != Worker_Stopped || m_state != Worker_Invalid) if (m_state != Worker_Stopped || m_state != Worker_Invalid)
{
Stop(true); Stop(true);
}
if (m_ThreadQueue.size()) if (m_ThreadQueue.size())
{
Flush(true); Flush(true);
}
} }
void ThreadWorker::OnTerminate(IThreadHandle *pHandle, bool cancel) void ThreadWorker::OnTerminate(IThreadHandle *pHandle, bool cancel)
@ -82,131 +65,88 @@ void ThreadWorker::OnTerminate(IThreadHandle *pHandle, bool cancel)
void ThreadWorker::RunThread(IThreadHandle *pHandle) void ThreadWorker::RunThread(IThreadHandle *pHandle)
{ {
WorkerState this_state = Worker_Running;
size_t num;
if (m_pHooks) if (m_pHooks)
{
m_pHooks->OnWorkerStart(this); m_pHooks->OnWorkerStart(this);
}
ke::AutoLock lock(&monitor_);
while (true) while (true)
{ {
/** if (m_state == Worker_Paused)
* Check number of items in the queue
*/
m_StateLock->Lock();
this_state = m_state;
m_StateLock->Unlock();
if (this_state != Worker_Stopped)
{ {
m_QueueLock->Lock(); // Wait until we're told to wake up.
num = m_ThreadQueue.size(); monitor_.Wait();
if (!num) continue;
{
/**
* if none, wait for an item
*/
m_Waiting = true;
m_QueueLock->Unlock();
/* first check if we should end again */
if (this_state == Worker_Stopped)
{
break;
}
m_AddSignal->Wait();
m_Waiting = false;
} else {
m_QueueLock->Unlock();
}
} }
m_StateLock->Lock();
this_state = m_state;
m_StateLock->Unlock();
if (this_state != Worker_Running)
{
if (this_state == Worker_Paused || this_state == Worker_Stopped)
{
//wait until the lock is cleared.
if (this_state == Worker_Paused)
{
m_PauseSignal->Wait();
}
if (this_state == Worker_Stopped)
{
//if we're supposed to flush cleanrly,
// run all of the remaining frames first.
if (!m_FlushType)
{
while (m_ThreadQueue.size())
{
RunFrame();
}
}
break;
}
}
}
/**
* Run the frame.
*/
RunFrame();
/** if (m_state == Worker_Stopped)
* wait in between threads if specified
*/
if (m_think_time)
{ {
m_Threader->ThreadSleep(m_think_time); // We've been told to stop entirely. If we've also been told to
// flush the queue, do that now.
while (!m_ThreadQueue.empty())
{
// Release the lock since PopThreadFromQueue() will re-acquire it. The
// main thread is blocking anyway.
ke::AutoUnlock unlock(&monitor_);
RunFrame();
}
assert(m_state == Worker_Stopped);
return;
} }
assert(m_state == Worker_Running);
// Process one frame.
WorkerState oldstate = m_state;
{
ke::AutoUnlock unlock(&monitor_);
RunFrame();
}
// If the state changed, loop back and process the new state.
if (m_state != oldstate)
continue;
// If the thread queue is now empty, wait for a signal. Otherwise, if
// we're on a delay, wait for either a notification or a timeout to
// process the next item. If the queue has items and we don't have a
// delay, then we just loop around and keep processing.
if (m_ThreadQueue.empty())
monitor_.Wait();
else if (m_think_time)
monitor_.Wait(m_think_time);
} }
if (m_pHooks)
{ {
m_pHooks->OnWorkerStop(this); ke::AutoUnlock unlock(&monitor_);
if (m_pHooks)
m_pHooks->OnWorkerStop(this);
} }
} }
SWThreadHandle *ThreadWorker::PopThreadFromQueue() SWThreadHandle *ThreadWorker::PopThreadFromQueue()
{ {
if (m_state <= Worker_Stopped && !m_QueueLock) ke::AutoLock lock(&monitor_);
{ if (m_state <= Worker_Stopped)
return NULL; return NULL;
}
SWThreadHandle *swt; return BaseWorker::PopThreadFromQueue();
m_QueueLock->Lock();
swt = BaseWorker::PopThreadFromQueue();
m_QueueLock->Unlock();
return swt;
} }
void ThreadWorker::AddThreadToQueue(SWThreadHandle *pHandle) void ThreadWorker::AddThreadToQueue(SWThreadHandle *pHandle)
{ {
ke::AutoLock lock(&monitor_);
if (m_state <= Worker_Stopped) if (m_state <= Worker_Stopped)
{
return; return;
}
m_QueueLock->Lock();
BaseWorker::AddThreadToQueue(pHandle); BaseWorker::AddThreadToQueue(pHandle);
if (m_Waiting) monitor_.Notify();
{
m_AddSignal->Signal();
}
m_QueueLock->Unlock();
} }
WorkerState ThreadWorker::GetStatus(unsigned int *threads) WorkerState ThreadWorker::GetStatus(unsigned int *threads)
{ {
WorkerState state; ke::AutoLock lock(&monitor_);
return BaseWorker::GetStatus(threads);
m_StateLock->Lock();
state = BaseWorker::GetStatus(threads);
m_StateLock->Unlock();
return state;
} }
void ThreadWorker::SetThinkTimePerFrame(unsigned int thinktime) void ThreadWorker::SetThinkTimePerFrame(unsigned int thinktime)
@ -216,56 +156,32 @@ void ThreadWorker::SetThinkTimePerFrame(unsigned int thinktime)
bool ThreadWorker::Start() bool ThreadWorker::Start()
{ {
if (m_state == Worker_Invalid) if (m_state == Worker_Invalid && m_Threader == NULL)
{ return false;
if (m_Threader == NULL)
{ if (m_state != Worker_Stopped)
return false;
}
} else if (m_state != Worker_Stopped) {
return false; return false;
}
m_Waiting = false;
m_QueueLock = m_Threader->MakeMutex();
m_StateLock = m_Threader->MakeMutex();
m_PauseSignal = m_Threader->MakeEventSignal();
m_AddSignal = m_Threader->MakeEventSignal();
m_state = Worker_Running; m_state = Worker_Running;
ThreadParams pt; ThreadParams pt;
pt.flags = Thread_Default; pt.flags = Thread_Default;
pt.prio = ThreadPrio_Normal; pt.prio = ThreadPrio_Normal;
me = m_Threader->MakeThread(this, &pt); me = m_Threader->MakeThread(this, &pt);
return true; return true;
} }
bool ThreadWorker::Stop(bool flush_cancel) bool ThreadWorker::Stop(bool flush_cancel)
{ {
if (m_state == Worker_Invalid || m_state == Worker_Stopped) // Change the state to signal a stop, and then trigger a notify.
{ {
return false; ke::AutoLock lock(&monitor_);
} if (m_state == Worker_Invalid || m_state == Worker_Stopped)
return false;
WorkerState oldstate; m_state = Worker_Stopped;
m_FlushType = flush_cancel;
//set new state monitor_.Notify();
m_StateLock->Lock();
oldstate = m_state;
m_state = Worker_Stopped;
m_FlushType = flush_cancel;
m_StateLock->Unlock();
if (oldstate == Worker_Paused)
{
Unpause();
} else {
m_QueueLock->Lock();
if (m_Waiting)
{
m_AddSignal->Signal();
}
m_QueueLock->Unlock();
} }
me->WaitForThread(); me->WaitForThread();
@ -274,33 +190,18 @@ bool ThreadWorker::Stop(bool flush_cancel)
//flush all remaining events //flush all remaining events
Flush(true); Flush(true);
//free mutex locks
m_QueueLock->DestroyThis();
m_StateLock->DestroyThis();
m_PauseSignal->DestroyThis();
m_AddSignal->DestroyThis();
//invalidizzle
m_QueueLock = NULL;
m_StateLock = NULL;
m_PauseSignal = NULL;
m_AddSignal = NULL;
me = NULL; me = NULL;
return true; return true;
} }
bool ThreadWorker::Pause() bool ThreadWorker::Pause()
{ {
if (m_state != Worker_Running) if (m_state != Worker_Running)
{
return false; return false;
}
m_StateLock->Lock(); ke::AutoLock lock(&monitor_);
m_state = Worker_Paused; m_state = Worker_Paused;
m_StateLock->Unlock(); monitor_.Notify();
return true; return true;
} }
@ -308,18 +209,10 @@ bool ThreadWorker::Pause()
bool ThreadWorker::Unpause() bool ThreadWorker::Unpause()
{ {
if (m_state != Worker_Paused) if (m_state != Worker_Paused)
{
return false; return false;
}
m_StateLock->Lock(); ke::AutoLock lock(&monitor_);
m_state = Worker_Running; m_state = Worker_Running;
m_StateLock->Unlock(); monitor_.Notify();
m_PauseSignal->Signal();
if (m_Waiting)
{
m_AddSignal->Signal();
}
return true; return true;
} }

View File

@ -60,14 +60,10 @@ public: //BaseWorker
virtual SWThreadHandle *PopThreadFromQueue(); virtual SWThreadHandle *PopThreadFromQueue();
protected: protected:
IThreader *m_Threader; IThreader *m_Threader;
IMutex *m_QueueLock;
IMutex *m_StateLock;
IEventSignal *m_PauseSignal;
IEventSignal *m_AddSignal;
IThreadHandle *me; IThreadHandle *me;
unsigned int m_think_time; unsigned int m_think_time;
volatile bool m_Waiting; bool m_FlushType;
volatile bool m_FlushType; ke::ConditionVariable monitor_;
}; };
#endif //_INCLUDE_SOURCEMOD_THREADWORKER_H #endif //_INCLUDE_SOURCEMOD_THREADWORKER_H

View File

@ -1,5 +1,5 @@
/** /**
* vim: set ts=4 : * vim: set ts=4 sw=4 tw=99 noet:
* ============================================================================= * =============================================================================
* SourceMod * SourceMod
* Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved. * Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved.
@ -55,9 +55,7 @@ void WinThreader::ThreadSleep(unsigned int ms)
IMutex *WinThreader::MakeMutex() IMutex *WinThreader::MakeMutex()
{ {
WinMutex *pMutex = new WinMutex(); return new CompatMutex();
return pMutex;
} }
IThreadHandle *WinThreader::MakeThread(IThread *pThread, ThreadFlags flags) IThreadHandle *WinThreader::MakeThread(IThread *pThread, ThreadFlags flags)
@ -80,24 +78,20 @@ void WinThreader::MakeThread(IThread *pThread)
MakeThread(pThread, &defparams); MakeThread(pThread, &defparams);
} }
DWORD WINAPI Win32_ThreadGate(LPVOID param) void WinThreader::ThreadHandle::Run()
{ {
WinThreader::ThreadHandle *pHandle = // Wait for an unpause if necessary.
reinterpret_cast<WinThreader::ThreadHandle *>(param); {
ke::AutoLock lock(&suspend_);
pHandle->m_run->RunThread(pHandle); if (m_state == Thread_Paused)
suspend_.Wait();
ThreadParams params; }
EnterCriticalSection(&pHandle->m_crit);
pHandle->m_state = Thread_Done; m_run->RunThread(this);
pHandle->GetParams(&params); m_state = Thread_Done;
LeaveCriticalSection(&pHandle->m_crit); m_run->OnTerminate(this, false);
if (m_params.flags & Thread_AutoRelease)
pHandle->m_run->OnTerminate(pHandle, false); delete this;
if (params.flags & Thread_AutoRelease)
delete pHandle;
return 0;
} }
void WinThreader::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) void WinThreader::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min)
@ -112,119 +106,52 @@ IThreadHandle *WinThreader::MakeThread(IThread *pThread, const ThreadParams *par
if (params == NULL) if (params == NULL)
params = &g_defparams; params = &g_defparams;
WinThreader::ThreadHandle *pHandle = ke::AutoPtr<ThreadHandle> pHandle(new ThreadHandle(this, pThread, params));
new WinThreader::ThreadHandle(this, NULL, pThread, params);
DWORD tid; pHandle->m_thread = new ke::Thread(pHandle, "SourceMod");
pHandle->m_thread = if (!pHandle->m_thread->Succeeded())
CreateThread(NULL, 0, &Win32_ThreadGate, (LPVOID)pHandle, CREATE_SUSPENDED, &tid);
if (!pHandle->m_thread)
{
delete pHandle;
return NULL; return NULL;
}
if (pHandle->m_params.prio != ThreadPrio_Normal) if (pHandle->m_params.prio != ThreadPrio_Normal)
{
pHandle->SetPriority(pHandle->m_params.prio); pHandle->SetPriority(pHandle->m_params.prio);
}
if (!(pHandle->m_params.flags & Thread_CreateSuspended)) if (!(params->flags & Thread_CreateSuspended))
{
pHandle->Unpause(); pHandle->Unpause();
}
return pHandle; return pHandle.take();
} }
IEventSignal *WinThreader::MakeEventSignal() IEventSignal *WinThreader::MakeEventSignal()
{ {
HANDLE event = CreateEventA(NULL, FALSE, FALSE, NULL); return new CompatCondVar();
if (!event)
return NULL;
WinEvent *pEvent = new WinEvent(event);
return pEvent;
}
/*****************
**** Mutexes ****
*****************/
WinThreader::WinMutex::WinMutex()
{
InitializeCriticalSection(&m_crit);
}
WinThreader::WinMutex::~WinMutex()
{
DeleteCriticalSection(&m_crit);
}
bool WinThreader::WinMutex::TryLock()
{
return (TryEnterCriticalSection(&m_crit) != FALSE);
}
void WinThreader::WinMutex::Lock()
{
EnterCriticalSection(&m_crit);
}
void WinThreader::WinMutex::Unlock()
{
LeaveCriticalSection(&m_crit);
}
void WinThreader::WinMutex::DestroyThis()
{
delete this;
} }
/****************** /******************
* Thread Handles * * Thread Handles *
******************/ ******************/
WinThreader::ThreadHandle::ThreadHandle(IThreader *parent, HANDLE hthread, IThread *run, const ThreadParams *params) : WinThreader::ThreadHandle::ThreadHandle(IThreader *parent, IThread *run, const ThreadParams *params) :
m_parent(parent), m_thread(hthread), m_run(run), m_params(*params), m_parent(parent), m_run(run), m_params(*params),
m_state(Thread_Paused) m_state(Thread_Paused)
{ {
InitializeCriticalSection(&m_crit);
} }
WinThreader::ThreadHandle::~ThreadHandle() WinThreader::ThreadHandle::~ThreadHandle()
{ {
if (m_thread)
{
CloseHandle(m_thread);
m_thread = NULL;
}
DeleteCriticalSection(&m_crit);
} }
bool WinThreader::ThreadHandle::WaitForThread() bool WinThreader::ThreadHandle::WaitForThread()
{ {
if (m_thread == NULL) if (!m_thread)
return false;
if (WaitForSingleObject(m_thread, INFINITE) != 0)
return false; return false;
m_thread->Join();
return true; return true;
} }
ThreadState WinThreader::ThreadHandle::GetState() ThreadState WinThreader::ThreadHandle::GetState()
{ {
ThreadState state; return m_state;
EnterCriticalSection(&m_crit);
state = m_state;
LeaveCriticalSection(&m_crit);
return state;
} }
IThreadCreator *WinThreader::ThreadHandle::Parent() IThreadCreator *WinThreader::ThreadHandle::Parent()
@ -255,21 +182,18 @@ ThreadPriority WinThreader::ThreadHandle::GetPriority()
bool WinThreader::ThreadHandle::SetPriority(ThreadPriority prio) bool WinThreader::ThreadHandle::SetPriority(ThreadPriority prio)
{ {
if (!m_thread)
return false;
BOOL res = FALSE; BOOL res = FALSE;
if (prio >= ThreadPrio_Maximum) if (prio >= ThreadPrio_Maximum)
res = SetThreadPriority(m_thread, THREAD_PRIORITY_HIGHEST); res = SetThreadPriority(m_thread->handle(), THREAD_PRIORITY_HIGHEST);
else if (prio <= ThreadPrio_Minimum) else if (prio <= ThreadPrio_Minimum)
res = SetThreadPriority(m_thread, THREAD_PRIORITY_LOWEST); res = SetThreadPriority(m_thread->handle(), THREAD_PRIORITY_LOWEST);
else if (prio == ThreadPrio_Normal) else if (prio == ThreadPrio_Normal)
res = SetThreadPriority(m_thread, THREAD_PRIORITY_NORMAL); res = SetThreadPriority(m_thread->handle(), THREAD_PRIORITY_NORMAL);
else if (prio == ThreadPrio_High) else if (prio == ThreadPrio_High)
res = SetThreadPriority(m_thread, THREAD_PRIORITY_ABOVE_NORMAL); res = SetThreadPriority(m_thread->handle(), THREAD_PRIORITY_ABOVE_NORMAL);
else if (prio == ThreadPrio_Low) else if (prio == ThreadPrio_Low)
res = SetThreadPriority(m_thread, THREAD_PRIORITY_BELOW_NORMAL); res = SetThreadPriority(m_thread->handle(), THREAD_PRIORITY_BELOW_NORMAL);
m_params.prio = prio; m_params.prio = prio;
@ -278,43 +202,11 @@ bool WinThreader::ThreadHandle::SetPriority(ThreadPriority prio)
bool WinThreader::ThreadHandle::Unpause() bool WinThreader::ThreadHandle::Unpause()
{ {
if (!m_thread)
return false;
if (m_state != Thread_Paused) if (m_state != Thread_Paused)
return false; return false;
ke::AutoLock lock(&suspend_);
m_state = Thread_Running; m_state = Thread_Running;
suspend_.Notify();
if (ResumeThread(m_thread) == -1)
{
m_state = Thread_Paused;
return false;
}
return true; return true;
} }
/*****************
* EVENT SIGNALS *
*****************/
WinThreader::WinEvent::~WinEvent()
{
CloseHandle(m_event);
}
void WinThreader::WinEvent::Wait()
{
WaitForSingleObject(m_event, INFINITE);
}
void WinThreader::WinEvent::Signal()
{
SetEvent(m_event);
}
void WinThreader::WinEvent::DestroyThis()
{
delete this;
}

View File

@ -32,22 +32,22 @@
#ifndef _INCLUDE_WINTHREADS_H_ #ifndef _INCLUDE_WINTHREADS_H_
#define _INCLUDE_WINTHREADS_H_ #define _INCLUDE_WINTHREADS_H_
#include <ke_thread_utils.h>
#include <ke_utility.h>
#include <windows.h> #include <windows.h>
#include "IThreader.h" #include "IThreader.h"
using namespace SourceMod; using namespace SourceMod;
DWORD WINAPI Win32_ThreadGate(LPVOID param);
class WinThreader : public IThreader class WinThreader : public IThreader
{ {
public: public:
class ThreadHandle : public IThreadHandle class ThreadHandle : public IThreadHandle, public ke::IRunnable
{ {
friend class WinThreader; friend class WinThreader;
friend DWORD WINAPI Win32_ThreadGate(LPVOID param); friend DWORD WINAPI Win32_ThreadGate(LPVOID param);
public: public:
ThreadHandle(IThreader *parent, HANDLE hthread, IThread *run, const ThreadParams *params); ThreadHandle(IThreader *parent, IThread *run, const ThreadParams *params);
virtual ~ThreadHandle(); virtual ~ThreadHandle();
public: public:
virtual bool WaitForThread(); virtual bool WaitForThread();
@ -58,41 +58,16 @@ public:
virtual bool SetPriority(ThreadPriority prio); virtual bool SetPriority(ThreadPriority prio);
virtual ThreadState GetState(); virtual ThreadState GetState();
virtual bool Unpause(); virtual bool Unpause();
virtual void Run();
protected: protected:
IThreader *m_parent; //Parent handle IThreader *m_parent; //Parent handle
HANDLE m_thread; //Windows HANDLE ke::AutoPtr<ke::Thread> m_thread;
ThreadParams m_params; //Current Parameters ThreadParams m_params; //Current Parameters
IThread *m_run; //Runnable context IThread *m_run; //Runnable context
ThreadState m_state; //internal state ThreadState m_state; //internal state
CRITICAL_SECTION m_crit; ke::ConditionVariable suspend_;
};
class WinMutex : public IMutex
{
public:
WinMutex();
virtual ~WinMutex();
public:
virtual bool TryLock();
virtual void Lock();
virtual void Unlock();
virtual void DestroyThis();
protected:
CRITICAL_SECTION m_crit;
};
class WinEvent : public IEventSignal
{
public:
WinEvent(HANDLE event) : m_event(event)
{
};
virtual ~WinEvent();
public:
virtual void Wait();
virtual void Signal();
virtual void DestroyThis();
public:
HANDLE m_event;
}; };
public: public:
IMutex *MakeMutex(); IMutex *MakeMutex();
void MakeThread(IThread *pThread); void MakeThread(IThread *pThread);

View File

@ -38,6 +38,7 @@
*/ */
#include <IShareSys.h> #include <IShareSys.h>
#include <ke_thread_utils.h>
#define SMINTERFACE_THREADER_NAME "IThreader" #define SMINTERFACE_THREADER_NAME "IThreader"
#define SMINTERFACE_THREADER_VERSION 3 #define SMINTERFACE_THREADER_VERSION 3

View File

@ -20,6 +20,7 @@
#include <assert.h> #include <assert.h>
#if defined(_MSC_VER) #if defined(_MSC_VER)
# include <windows.h> # include <windows.h>
# include <WinBase.h>
#else #else
# include <pthread.h> # include <pthread.h>
#endif #endif
@ -56,8 +57,13 @@
// When Wait() returns, the lock is automatically re-acquired. This operation // When Wait() returns, the lock is automatically re-acquired. This operation
// is NOT atomic. In between waking up and re-acquiring the lock, another // is NOT atomic. In between waking up and re-acquiring the lock, another
// thread may steal the lock and issue another event. Applications must // thread may steal the lock and issue another event. Applications must
// account for this. For example, a message pump should check that there are no // account for this. For example, a message pump should check that there are
// messages left to process before blocking again. // no messages left to process before blocking again.
//
// Likewise, it is also not defined whether a Signal() will have any effect
// while a thread is not waiting on the monitor. This is yet another reason
// the above paragraph is so important - applications should, under a lock of
// the condition variable - check for state changes before waiting.
// //
// -- Threads -- // -- Threads --
// //
@ -72,13 +78,13 @@
namespace ke { namespace ke {
// Abstraction for accessing the current thread. // Abstraction for getting a unique thread identifier. Debug-only.
#if defined(_MSC_VER) #if defined(_MSC_VER)
typedef HANDLE ThreadId; typedef DWORD ThreadId;
static inline ThreadId GetCurrentThreadId() static inline ThreadId GetCurrentThreadId()
{ {
return GetCurrentThread(); return ::GetCurrentThreadId();
} }
#else #else
typedef pthread_t ThreadId; typedef pthread_t ThreadId;
@ -238,4 +244,3 @@ class IRunnable
#endif #endif
#endif // _include_sourcepawn_threads_ #endif // _include_sourcepawn_threads_

View File

@ -120,6 +120,10 @@ class Thread
WaitForSingleObject(thread_, INFINITE); WaitForSingleObject(thread_, INFINITE);
} }
HANDLE handle() const {
return thread_;
}
private: private:
static DWORD WINAPI Main(LPVOID arg) { static DWORD WINAPI Main(LPVOID arg) {
((IRunnable *)arg)->Run(); ((IRunnable *)arg)->Run();

View File

@ -22,6 +22,7 @@
#include <assert.h> #include <assert.h>
#include <stddef.h> #include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdint.h>
#if defined(_MSC_VER) #if defined(_MSC_VER)
# include <intrin.h> # include <intrin.h>
#endif #endif
@ -56,6 +57,8 @@ ReturnAndVoid(T &t)
return saved; return saved;
} }
// Wrapper that automatically deletes its contents. The pointer can be taken
// to avoid destruction.
template <typename T> template <typename T>
class AutoPtr class AutoPtr
{ {
@ -89,6 +92,9 @@ class AutoPtr
delete t_; delete t_;
t_ = t; t_ = t;
} }
bool operator !() const {
return !t_;
}
}; };
// Bob Jenkin's one-at-a-time hash function[1]. // Bob Jenkin's one-at-a-time hash function[1].