From efe93fa470f65536933c27baf7cb4b5f81b746cf Mon Sep 17 00:00:00 2001 From: David Anderson Date: Fri, 19 Jan 2007 04:35:56 +0000 Subject: [PATCH] initial import of threading extension --HG-- extra : convert_revision : svn%3A39bc706e-5318-0410-9160-8a85361fbb7c/trunk%40327 --- extensions/threader/extension.cpp | 14 + extensions/threader/extension.h | 59 +++ extensions/threader/msvc8/threader.sln | 20 ++ extensions/threader/msvc8/threader.vcproj | 267 ++++++++++++++ extensions/threader/smsdk_config.h | 27 ++ extensions/threader/smsdk_ext.cpp | 279 +++++++++++++++ extensions/threader/smsdk_ext.h | 142 ++++++++ extensions/threader/thread/BaseWorker.cpp | 249 +++++++++++++ extensions/threader/thread/BaseWorker.h | 72 ++++ extensions/threader/thread/IThreader.h | 378 ++++++++++++++++++++ extensions/threader/thread/PosixThreads.cpp | 263 ++++++++++++++ extensions/threader/thread/PosixThreads.h | 82 +++++ extensions/threader/thread/ThreadSupport.h | 10 + extensions/threader/thread/ThreadWorker.cpp | 258 +++++++++++++ extensions/threader/thread/ThreadWorker.h | 40 +++ extensions/threader/thread/WinThreads.cpp | 305 ++++++++++++++++ extensions/threader/thread/WinThreads.h | 84 +++++ 17 files changed, 2549 insertions(+) create mode 100644 extensions/threader/extension.cpp create mode 100644 extensions/threader/extension.h create mode 100644 extensions/threader/msvc8/threader.sln create mode 100644 extensions/threader/msvc8/threader.vcproj create mode 100644 extensions/threader/smsdk_config.h create mode 100644 extensions/threader/smsdk_ext.cpp create mode 100644 extensions/threader/smsdk_ext.h create mode 100644 extensions/threader/thread/BaseWorker.cpp create mode 100644 extensions/threader/thread/BaseWorker.h create mode 100644 extensions/threader/thread/IThreader.h create mode 100644 extensions/threader/thread/PosixThreads.cpp create mode 100644 extensions/threader/thread/PosixThreads.h create mode 100644 extensions/threader/thread/ThreadSupport.h create mode 100644 extensions/threader/thread/ThreadWorker.cpp create mode 100644 extensions/threader/thread/ThreadWorker.h create mode 100644 extensions/threader/thread/WinThreads.cpp create mode 100644 extensions/threader/thread/WinThreads.h diff --git a/extensions/threader/extension.cpp b/extensions/threader/extension.cpp new file mode 100644 index 00000000..2d1cc3f6 --- /dev/null +++ b/extensions/threader/extension.cpp @@ -0,0 +1,14 @@ +#include "extension.h" +#include "thread/ThreadSupport.h" + +Sample g_Sample; +MainThreader g_Threader; + +SMEXT_LINK(&g_Sample); + +bool Sample::SDK_OnLoad(char *error, size_t err_max, bool late) +{ + g_pShareSys->AddInterface(myself, &g_Threader); + + return true; +} diff --git a/extensions/threader/extension.h b/extensions/threader/extension.h new file mode 100644 index 00000000..584dbdc1 --- /dev/null +++ b/extensions/threader/extension.h @@ -0,0 +1,59 @@ +#ifndef _INCLUDE_SOURCEMOD_EXTENSION_PROPER_H_ +#define _INCLUDE_SOURCEMOD_EXTENSION_PROPER_H_ + +#include "smsdk_ext.h" + +/** + * @brief Sample implementation of the SDK Extension. + * Note: Uncomment one of the pre-defined virtual functions in order to use it. + */ +class Sample : public SDKExtension +{ +public: + /** + * @brief This is called after the initial loading sequence has been processed. + * + * @param error Error message buffer. + * @param err_max Size of error message buffer. + * @param late Whether or not the module was loaded after map load. + * @return True to succeed loading, false to fail. + */ + virtual bool SDK_OnLoad(char *error, size_t err_max, bool late); + + /** + * @brief This is called right before the extension is unloaded. + */ + //virtual void SDK_OnUnload(); + + /** + * @brief This is called once all known extensions have been loaded. + * Note: It is is a good idea to add natives here, if any are provided. + */ + //virtual void SDK_OnAllLoaded(); + + /** + * @brief Called when the pause state is changed. + */ + //virtual void SDK_OnPauseChange(bool paused); + + /** + * @brief this is called when Core wants to know if your extension is working. + * + * @param error Error message buffer. + * @param err_max Size of error message buffer. + * @return True if working, false otherwise. + */ + //virtual void QueryRunning(char *error, size_t maxlength); +public: +#if defined SMEXT_CONF_METAMOD + /** + * Read smext_base.h for documentation on these. + */ + + //virtual bool SDK_OnMetamodLoad(char *error, size_t err_max, bool late); + //virtual bool SDK_OnMetamodUnload(char *error, size_t err_max); + //virtual bool SDK_OnMetamodPauseChange(bool paused, char *error, size_t err_max); +#endif +}; + +#endif //_INCLUDE_SOURCEMOD_EXTENSION_PROPER_H_ diff --git a/extensions/threader/msvc8/threader.sln b/extensions/threader/msvc8/threader.sln new file mode 100644 index 00000000..c2d90607 --- /dev/null +++ b/extensions/threader/msvc8/threader.sln @@ -0,0 +1,20 @@ + +Microsoft Visual Studio Solution File, Format Version 9.00 +# Visual Studio 2005 +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "threader", "threader.vcproj", "{C9F9E996-0C20-4D96-8E52-4530F41E22CE}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Win32 = Debug|Win32 + Release|Win32 = Release|Win32 + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {C9F9E996-0C20-4D96-8E52-4530F41E22CE}.Debug|Win32.ActiveCfg = Debug|Win32 + {C9F9E996-0C20-4D96-8E52-4530F41E22CE}.Debug|Win32.Build.0 = Debug|Win32 + {C9F9E996-0C20-4D96-8E52-4530F41E22CE}.Release|Win32.ActiveCfg = Release|Win32 + {C9F9E996-0C20-4D96-8E52-4530F41E22CE}.Release|Win32.Build.0 = Release|Win32 + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/extensions/threader/msvc8/threader.vcproj b/extensions/threader/msvc8/threader.vcproj new file mode 100644 index 00000000..d76a6ad7 --- /dev/null +++ b/extensions/threader/msvc8/threader.vcproj @@ -0,0 +1,267 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/extensions/threader/smsdk_config.h b/extensions/threader/smsdk_config.h new file mode 100644 index 00000000..13ebf9c3 --- /dev/null +++ b/extensions/threader/smsdk_config.h @@ -0,0 +1,27 @@ +#ifndef _INCLUDE_SOURCEMOD_EXTENSION_CONFIG_H_ +#define _INCLUDE_SOURCEMOD_EXTENSION_CONFIG_H_ + +/* Basic information exposed publically */ +#define SMEXT_CONF_NAME "Threader" +#define SMEXT_CONF_DESCRIPTION "Provides threading to other modules" +#define SMEXT_CONF_VERSION "1.0.0.0" +#define SMEXT_CONF_AUTHOR "AlliedModders" +#define SMEXT_CONF_URL "http://www.sourcemod.net/" +#define SMEXT_CONF_LOGTAG "THREADER" +#define SMEXT_CONF_LICENSE "GPL" +#define SMEXT_CONF_DATESTRING __DATE__ + +/** + * @brief Exposes plugin's main interface. + */ +#define SMEXT_LINK(name) SDKExtension *g_pExtensionIface = name; + +/** + * @brief Sets whether or not this plugin required Metamod. + * NOTE: Uncomment to enable, comment to disable. + * NOTE: This is enabled automatically if a Metamod build is chosen in + * the Visual Studio project. + */ +//#define SMEXT_CONF_METAMOD + +#endif //_INCLUDE_SOURCEMOD_EXTENSION_CONFIG_H_ diff --git a/extensions/threader/smsdk_ext.cpp b/extensions/threader/smsdk_ext.cpp new file mode 100644 index 00000000..01e3f42c --- /dev/null +++ b/extensions/threader/smsdk_ext.cpp @@ -0,0 +1,279 @@ +#include +#include "smsdk_ext.h" + +IShareSys *g_pShareSys = NULL; +IExtension *myself = NULL; +IHandleSys *g_pHandleSys = NULL; +ISourceMod *g_pSM = NULL; + +PLATFORM_EXTERN_C IExtensionInterface *GetSMExtAPI() +{ + return g_pExtensionIface; +} + +SDKExtension::SDKExtension() +{ +#if defined SMEXT_CONF_METAMOD + m_SourceMMLoaded = false; + m_WeAreUnloaded = false; + m_WeGotPauseChange = false; +#endif +} + +bool SDKExtension::OnExtensionLoad(IExtension *me, IShareSys *sys, char *error, size_t err_max, bool late) +{ + g_pShareSys = sys; + myself = me; + +#if defined SMEXT_CONF_METAMOD + m_WeAreUnloaded = true; + + if (!m_SourceMMLoaded) + { + if (error) + { + snprintf(error, err_max, "Metamod attach failed"); + } + return false; + } +#endif + + SM_GET_IFACE(HANDLESYSTEM, g_pHandleSys); + SM_GET_IFACE(SOURCEMOD, g_pSM); + + if (SDK_OnLoad(error, err_max, late)) + { +#if defined SMEXT_CONF_METAMOD + m_WeAreUnloaded = true; +#endif + return true; + } + + return false; +} + +bool SDKExtension::IsMetamodExtension() +{ +#if defined SMEXT_CONF_METAMOD + return true; +#else + return false; +#endif +} + +void SDKExtension::OnExtensionPauseChange(bool state) +{ +#if defined SMEXT_CONF_METAMOD + m_WeGotPauseChange = true; +#endif + SDK_OnPauseChange(state); +} + +void SDKExtension::OnExtensionsAllLoaded() +{ + SDK_OnAllLoaded(); +} + +void SDKExtension::OnExtensionUnload() +{ +#if defined SMEXT_CONF_METAMOD + m_WeAreUnloaded = true; +#endif + SDK_OnUnload(); +} + +const char *SDKExtension::GetExtensionAuthor() +{ + return SMEXT_CONF_AUTHOR; +} + +const char *SDKExtension::GetExtensionDateString() +{ + return SMEXT_CONF_DATESTRING; +} + +const char *SDKExtension::GetExtensionDescription() +{ + return SMEXT_CONF_DESCRIPTION; +} + +const char *SDKExtension::GetExtensionVerString() +{ + return SMEXT_CONF_VERSION; +} + +const char *SDKExtension::GetExtensionName() +{ + return SMEXT_CONF_NAME; +} + +const char *SDKExtension::GetExtensionTag() +{ + return SMEXT_CONF_LOGTAG; +} + +const char *SDKExtension::GetExtensionURL() +{ + return SMEXT_CONF_URL; +} + +bool SDKExtension::SDK_OnLoad(char *error, size_t err_max, bool late) +{ + return true; +} + +void SDKExtension::SDK_OnUnload() +{ +} + +void SDKExtension::SDK_OnPauseChange(bool paused) +{ +} + +void SDKExtension::SDK_OnAllLoaded() +{ +} + +#if defined SMEXT_CONF_METAMOD + +PluginId g_PLID = 0; +ISmmPlugin *g_PLAPI = NULL; +SourceHook::ISourceHook *g_SHPtr = NULL; +ISmmAPI *g_SMAPI = NULL; + +IVEngineServer *engine = NULL; +IServerGameDLL *gamedll = NULL; + +SMM_API void *PL_EXPOSURE(const char *name, int *code) +{ + if (name && !strcmp(name, PLAPI_NAME)) + { + if (code) + { + *code = IFACE_OK; + } + return static_cast(g_pExtensionIface); + } + + if (code) + { + *code = IFACE_FAILED; + } + + return NULL; +} + +bool SDKExtension::Load(PluginId id, ISmmAPI *ismm, char *error, size_t maxlen, bool late) +{ + PLUGIN_SAVEVARS(); + + GET_V_IFACE_ANY(serverFactory, gamedll, IServerGameDLL, INTERFACEVERSION_SERVERGAMEDLL); + GET_V_IFACE_CURRENT(engineFactory, engine, IVEngineServer, INTERFACEVERSION_VENGINESERVER); + + m_SourceMMLoaded = true; + + return SDK_OnMetamodLoad(error, maxlen, late); +} + +bool SDKExtension::Unload(char *error, size_t maxlen) +{ + if (!m_WeAreUnloaded) + { + if (error) + { + snprintf(error, maxlen, "This extension must be unloaded by SourceMod."); + } + return false; + } + + return SDK_OnMetamodUnload(error, maxlen); +} + +bool SDKExtension::Pause(char *error, size_t maxlen) +{ + if (!m_WeGotPauseChange) + { + if (error) + { + snprintf(error, maxlen, "This extension must be paused by SourceMod."); + } + return false; + } + + m_WeGotPauseChange = false; + + return SDK_OnMetamodPauseChange(true, error, maxlen); +} + +bool SDKExtension::Unpause(char *error, size_t maxlen) +{ + if (!m_WeGotPauseChange) + { + if (error) + { + snprintf(error, maxlen, "This extension must be unpaused by SourceMod."); + } + return false; + } + + m_WeGotPauseChange = false; + + return SDK_OnMetamodPauseChange(false, error, maxlen); +} + +const char *SDKExtension::GetAuthor() +{ + return GetExtensionAuthor(); +} + +const char *SDKExtension::GetDate() +{ + return GetExtensionDateString(); +} + +const char *SDKExtension::GetDescription() +{ + return GetExtensionDescription(); +} + +const char *SDKExtension::GetLicense() +{ + return SMEXT_CONF_LICENSE; +} + +const char *SDKExtension::GetLogTag() +{ + return GetExtensionTag(); +} + +const char *SDKExtension::GetName() +{ + return GetExtensionName(); +} + +const char *SDKExtension::GetURL() +{ + return GetExtensionURL(); +} + +const char *SDKExtension::GetVersion() +{ + return GetExtensionVerString(); +} + +bool SDKExtension::SDK_OnMetamodLoad(char *error, size_t err_max, bool late) +{ + return true; +} + +bool SDKExtension::SDK_OnMetamodUnload(char *error, size_t err_max) +{ + return true; +} + +bool SDKExtension::SDK_OnMetamodPauseChange(bool paused, char *error, size_t err_max) +{ + return true; +} + +#endif diff --git a/extensions/threader/smsdk_ext.h b/extensions/threader/smsdk_ext.h new file mode 100644 index 00000000..07362c78 --- /dev/null +++ b/extensions/threader/smsdk_ext.h @@ -0,0 +1,142 @@ +#ifndef _INCLUDE_SOURCEMOD_EXTENSION_BASESDK_H_ +#define _INCLUDE_SOURCEMOD_EXTENSION_BASESDK_H_ + +#include "smsdk_config.h" +#include +#include +#include +#include +#include + +#if defined SMEXT_CONF_METAMOD +#include +#include +#endif + +using namespace SourceMod; +using namespace SourcePawn; + +class SDKExtension : +#if defined SMEXT_CONF_METAMOD + public ISmmPlugin, +#endif + public IExtensionInterface +{ +public: + SDKExtension(); +public: + /** + * @brief This is called after the initial loading sequence has been processed. + * + * @param error Error message buffer. + * @param err_max Size of error message buffer. + * @param late Whether or not the module was loaded after map load. + * @return True to succeed loading, false to fail. + */ + virtual bool SDK_OnLoad(char *error, size_t err_max, bool late); + + /** + * @brief This is called right before the extension is unloaded. + */ + virtual void SDK_OnUnload(); + + /** + * @brief This is called once all known extensions have been loaded. + */ + virtual void SDK_OnAllLoaded(); + + /** + * @brief Called when the pause state is changed. + */ + virtual void SDK_OnPauseChange(bool paused); + +#if defined SMEXT_CONF_METAMOD + /** + * @brief Called when Metamod is attached, before the extension version is called. + * + * @param error Error buffer. + * @param err_max Maximum size of error buffer. + * @param late Whether or not Metamod considers this a late load. + * @return True to succeed, false to fail. + */ + virtual bool SDK_OnMetamodLoad(char *error, size_t err_max, bool late); + + /** + * @brief Called when Metamod is detaching, after the extension version is called. + * NOTE: By default this is blocked unless sent from SourceMod. + * + * @param error Error buffer. + * @param err_max Maximum size of error buffer. + * @return True to succeed, false to fail. + */ + virtual bool SDK_OnMetamodUnload(char *error, size_t err_max); + + /** + * @brief Called when Metamod's pause state is changing. + * NOTE: By default this is blocked unless sent from SourceMod. + * + * @param paused Pause state being set. + * @param error Error buffer. + * @param err_max Maximum size of error buffer. + * @return True to succeed, false to fail. + */ + virtual bool SDK_OnMetamodPauseChange(bool paused, char *error, size_t err_max); +#endif + +public: //IExtensionInterface + virtual bool OnExtensionLoad(IExtension *me, IShareSys *sys, char *error, size_t err_max, bool late); + virtual void OnExtensionUnload(); + virtual void OnExtensionsAllLoaded(); + virtual bool IsMetamodExtension(); + virtual void OnExtensionPauseChange(bool state); + virtual const char *GetExtensionName(); + virtual const char *GetExtensionURL(); + virtual const char *GetExtensionTag(); + virtual const char *GetExtensionAuthor(); + virtual const char *GetExtensionVerString(); + virtual const char *GetExtensionDescription(); + virtual const char *GetExtensionDateString(); +#if defined SMEXT_CONF_METAMOD +public: //ISmmPlugin + virtual bool Load(PluginId id, ISmmAPI *ismm, char *error, size_t maxlength, bool late); + virtual const char *GetAuthor(); + virtual const char *GetName(); + virtual const char *GetDescription(); + virtual const char *GetURL(); + virtual const char *GetLicense(); + virtual const char *GetVersion(); + virtual const char *GetDate(); + virtual const char *GetLogTag(); + virtual bool Unload(char *error, size_t maxlen); + virtual bool Pause(char *error, size_t maxlen); + virtual bool Unpause(char *error, size_t maxlen); +private: + bool m_SourceMMLoaded; + bool m_WeAreUnloaded; + bool m_WeGotPauseChange; +#endif +}; + +extern SDKExtension *g_pExtensionIface; + +extern IShareSys *g_pShareSys; +extern IExtension *myself; +extern IHandleSys *g_pHandleSys; +extern ISourceMod *g_pSM; + +#if defined SMEXT_CONF_METAMOD +PLUGIN_GLOBALVARS(); +extern IVEngineServer *engine; +extern IServerGameDLL *gamedll; +#endif + +#define SM_MKIFACE(name) SMINTERFACE_##name##_NAME, SMINTERFACE_##name##_VERSION +#define SM_GET_IFACE(prefix,addr) \ + if (!g_pShareSys->RequestInterface(SM_MKIFACE(prefix), myself, (SMInterface **)&addr)) { \ + if (error) { \ + snprintf(error, err_max, "Could not find interface: %s", SMINTERFACE_##prefix##_NAME); \ + } \ + return false; \ + } + +#endif //_INCLUDE_SOURCEMOD_EXTENSION_BASESDK_H_ diff --git a/extensions/threader/thread/BaseWorker.cpp b/extensions/threader/thread/BaseWorker.cpp new file mode 100644 index 00000000..819c5d02 --- /dev/null +++ b/extensions/threader/thread/BaseWorker.cpp @@ -0,0 +1,249 @@ +#include "BaseWorker.h" + +BaseWorker::BaseWorker() : + m_perFrame(SM_DEFAULT_THREADS_PER_FRAME), + m_state(Worker_Stopped) +{ +} + +BaseWorker::~BaseWorker() +{ + if (m_state != Worker_Stopped || m_state != Worker_Invalid) + Stop(true); + + if (m_ThreadQueue.size()) + Flush(true); +} + +void BaseWorker::MakeThread(IThread *pThread) +{ + ThreadParams pt; + + pt.flags = Thread_AutoRelease; + pt.prio = ThreadPrio_Normal; + + MakeThread(pThread, &pt); +} + +IThreadHandle *BaseWorker::MakeThread(IThread *pThread, ThreadFlags flags) +{ + ThreadParams pt; + + pt.flags = flags; + pt.prio = ThreadPrio_Normal; + + return MakeThread(pThread, &pt); +} + +IThreadHandle *BaseWorker::MakeThread(IThread *pThread, const ThreadParams *params) +{ + if (m_state != Worker_Running) + return NULL; + + SWThreadHandle *swt = new SWThreadHandle(this, params, pThread); + + AddThreadToQueue(swt); + + return swt; +} + +void BaseWorker::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) +{ + max = ThreadPrio_Normal; + min = ThreadPrio_Normal; +} + +unsigned int BaseWorker::Flush(bool flush_cancel) +{ + SWThreadHandle *swt; + unsigned int num = 0; + + while ((swt=PopThreadFromQueue()) != NULL) + { + swt->m_state = Thread_Done; + if (!flush_cancel) + swt->pThread->RunThread(swt); + swt->pThread->OnTerminate(swt, flush_cancel); + if (swt->m_params.flags & Thread_AutoRelease) + delete swt; + num++; + } + + return num; +} + +SWThreadHandle *BaseWorker::PopThreadFromQueue() +{ + if (!m_ThreadQueue.size()) + return NULL; + + SourceHook::List::iterator begin; + SWThreadHandle *swt; + + begin = m_ThreadQueue.begin(); + swt = (*begin); + m_ThreadQueue.erase(begin); + + return swt; +} + +void BaseWorker::AddThreadToQueue(SWThreadHandle *pHandle) +{ + m_ThreadQueue.push_back(pHandle); +} + +unsigned int BaseWorker::GetMaxThreadsPerFrame() +{ + return m_perFrame; +} + +WorkerState BaseWorker::GetStatus(unsigned int *threads) +{ + if (threads) + *threads = m_perFrame; + + return m_state; +} + +unsigned int BaseWorker::RunFrame() +{ + unsigned int done = 0; + unsigned int max = GetMaxThreadsPerFrame(); + SWThreadHandle *swt = NULL; + IThread *pThread = NULL; + + while (done < max) + { + if ((swt=PopThreadFromQueue()) == NULL) + break; + pThread = swt->pThread; + swt->m_state = Thread_Running; + pThread->RunThread(swt); + swt->m_state = Thread_Done; + pThread->OnTerminate(swt, false); + if (swt->m_params.flags & Thread_AutoRelease) + delete swt; + done++; + } + + return done; +} + +void BaseWorker::SetMaxThreadsPerFrame(unsigned int threads) +{ + m_perFrame = threads; +} + +bool BaseWorker::Start() +{ + if (m_state != Worker_Invalid && m_state != Worker_Stopped) + { + return false; + } + + m_state = Worker_Running; + + return true; +} + +bool BaseWorker::Stop(bool flush_cancel) +{ + if (m_state == Worker_Invalid || m_state == Worker_Stopped) + return false; + + if (m_state == Worker_Paused) + { + if (!Unpause()) + return false; + } + + m_state = Worker_Stopped; + Flush(flush_cancel); + + return true; +} + +bool BaseWorker::Pause() +{ + if (m_state != Worker_Running) + return false; + + m_state = Worker_Paused; + + return true; +} + + +bool BaseWorker::Unpause() +{ + if (m_state != Worker_Paused) + return false; + + m_state = Worker_Running; + + return true; +} + +/*********************** + * THREAD HANDLE STUFF * + ***********************/ + +void SWThreadHandle::DestroyThis() +{ + delete this; +} + +void SWThreadHandle::GetParams(ThreadParams *p) +{ + *p = m_params; +} + +ThreadPriority SWThreadHandle::GetPriority() +{ + return m_params.prio; +} + +ThreadState SWThreadHandle::GetState() +{ + return m_state; +} + +IThreadCreator *SWThreadHandle::Parent() +{ + return m_parent; +} + +bool SWThreadHandle::SetPriority(ThreadPriority prio) +{ + if (m_params.prio != ThreadPrio_Normal) + return false; + + m_params.prio = prio; + + return true; +} + +bool SWThreadHandle::Unpause() +{ + if (m_state != Thread_Paused) + return false; + + m_state = Thread_Running; + + return true; +} + +bool SWThreadHandle::WaitForThread() +{ + return false; +} + +SWThreadHandle::SWThreadHandle(IThreadCreator *parent, const ThreadParams *p, IThread *thread) : + m_state(Thread_Paused), m_params(*p), m_parent(parent), pThread(thread) +{ +} + +IThread *SWThreadHandle::GetThread() +{ + return pThread; +} diff --git a/extensions/threader/thread/BaseWorker.h b/extensions/threader/thread/BaseWorker.h new file mode 100644 index 00000000..5e91c28b --- /dev/null +++ b/extensions/threader/thread/BaseWorker.h @@ -0,0 +1,72 @@ +#ifndef _INCLUDE_SOURCEMOD_BASEWORKER_H +#define _INCLUDE_SOURCEMOD_BASEWORKER_H + +#include "sh_list.h" +#include "ThreadSupport.h" + +#define SM_DEFAULT_THREADS_PER_FRAME 1 + +class BaseWorker; + +//SW = Simple Wrapper +class SWThreadHandle : public IThreadHandle +{ + friend class BaseWorker; +public: + SWThreadHandle(IThreadCreator *parent, const ThreadParams *p, IThread *thread); + IThread *GetThread(); +public: + //NOTE: We don't support this by default. + //It's specific usage that'd require many mutexes + virtual bool WaitForThread(); +public: + virtual void DestroyThis(); + virtual IThreadCreator *Parent(); + virtual void GetParams(ThreadParams *ptparams); +public: + //Priorities not supported by default. + virtual ThreadPriority GetPriority(); + virtual bool SetPriority(ThreadPriority prio); +public: + virtual ThreadState GetState(); + virtual bool Unpause(); +private: + ThreadState m_state; + ThreadParams m_params; + IThreadCreator *m_parent; + IThread *pThread; +}; + +class BaseWorker : public IThreadWorker +{ +public: + BaseWorker(); + virtual ~BaseWorker(); +public: //IWorker + virtual unsigned int RunFrame(); + //Controls the worker + virtual bool Pause(); + virtual bool Unpause(); + virtual bool Start(); + virtual bool Stop(bool flush_cancel); + //Flushes out any remaining threads + virtual unsigned int Flush(bool flush_cancel); + //returns status and number of threads in queue + virtual WorkerState GetStatus(unsigned int *numThreads); +public: //IThreadCreator + virtual void MakeThread(IThread *pThread); + virtual IThreadHandle *MakeThread(IThread *pThread, ThreadFlags flags); + virtual IThreadHandle *MakeThread(IThread *pThread, const ThreadParams *params); + virtual void GetPriorityBounds(ThreadPriority &max, ThreadPriority &min); +public: //BaseWorker + virtual void AddThreadToQueue(SWThreadHandle *pHandle); + virtual SWThreadHandle *PopThreadFromQueue(); + virtual void SetMaxThreadsPerFrame(unsigned int threads); + virtual unsigned int GetMaxThreadsPerFrame(); +protected: + SourceHook::List m_ThreadQueue; + unsigned int m_perFrame; + volatile WorkerState m_state; +}; + +#endif //_INCLUDE_SOURCEMOD_BASEWORKER_H diff --git a/extensions/threader/thread/IThreader.h b/extensions/threader/thread/IThreader.h new file mode 100644 index 00000000..90e401fe --- /dev/null +++ b/extensions/threader/thread/IThreader.h @@ -0,0 +1,378 @@ +#ifndef _INCLUDE_SOURCEMOD_THREADER_H +#define _INCLUDE_SOURCEMOD_THREADER_H + +#include + +#define SMINTERFACE_THREADER_NAME "IThreader" +#define SMINTERFACE_THREADER_VERSION 1 + +namespace SourceMod +{ + /** + * @brief Thread creation flags + */ + enum ThreadFlags + { + Thread_Default = 0, + /** + * Auto-release handle on finish + * You are not guaranteed the handle for this is valid after + * calling MakeThread(), so never use it until OnTerminate is called. + */ + Thread_AutoRelease = 1, + /** + * Thread is created "suspended", meaning it is inactive until unpaused. + */ + Thread_CreateSuspended = 2, + }; + + /** + * @brief Specifies thread priority levels. + */ + enum ThreadPriority + { + ThreadPrio_Minimum = -8, + ThreadPrio_Low = -3, + ThreadPrio_Normal = 0, + ThreadPrio_High = 3, + ThreadPrio_Maximum = 8, + }; + + /** + * @brief The current state of a thread. + */ + enum ThreadState + { + Thread_Running = 0, + Thread_Paused = 1, + Thread_Done = 2, + }; + + /** + * @brief Thread-specific parameters. + */ + struct ThreadParams + { + ThreadParams() : + flags(Thread_Default), + prio(ThreadPrio_Normal) + { + }; + ThreadFlags flags; + ThreadPriority prio; + }; + + class IThreadCreator; + + /** + * @brief Describes a handle to a thread. + */ + class IThreadHandle + { + public: + virtual ~IThreadHandle() { }; + public: + /** + * @brief Pauses parent thread until this thread completes. + * + * @return True if successful, false otherwise. + */ + virtual bool WaitForThread() =0; + + /** + * @brief Destroys the thread handle. This will not necessarily cancel the thread. + */ + virtual void DestroyThis() =0; + + /** + * @brief Returns the parent threader. + * + * @return IThreadCreator that created this thread. + */ + virtual IThreadCreator *Parent() =0; + + /** + * @brief Returns the thread states. + * + * @param ptparmas Pointer to a ThreadParams buffer. + */ + virtual void GetParams(ThreadParams *ptparams) =0; + + /** + * @brief Returns the thread priority. + * + * @return Thread priority. + */ + virtual ThreadPriority GetPriority() =0; + + /** + * @brief Sets thread priority. + * NOTE: On Linux, this always returns false. + * + * @param prio Thread priority to set. + * @return True if successful, false otherwise. + */ + virtual bool SetPriority(ThreadPriority prio) =0; + + /** + * @brief Returns the thread state. + * + * @return Current thread state. + */ + virtual ThreadState GetState() =0; + + /** + * @brief Attempts to unpause a paused thread. + * + * @return True on success, false otherwise. + */ + virtual bool Unpause() =0; + }; + + /** + * @brief Handles a single thread's execution. + */ + class IThread + { + public: + virtual ~IThread() { }; + public: + /** + * @brief Called when the thread runs (in its own thread). + * + * @param pHandle Pointer to the thread's handle. + */ + virtual void RunThread(IThreadHandle *pHandle) =0; + + /** + * @param Called when the thread terminates. + * Note: This occurs inside the thread as well. + * + * @param pHandle Pointer to the thread's handle. + * @param cancel True if the thread did not finish, false otherwise. + */ + virtual void OnTerminate(IThreadHandle *pHandle, bool cancel) =0; + }; + + + /** + * @brief Describes a thread creator + */ + class IThreadCreator + { + public: + virtual ~IThreadCreator() { }; + public: + /** + * @brief Creates a basic thread. + * + * @param pThread IThread pointer for callbacks. + */ + virtual void MakeThread(IThread *pThread) =0; + + /** + * @brief Creates a thread with specific options. + * + * @param pThread IThread pointer for callbacks. + * @param flags Flags for the thread. + * @return IThreadHandle pointer (must be released). + */ + virtual IThreadHandle *MakeThread(IThread *pThread, ThreadFlags flags) =0; + + /** + * @brief Creates a thread with specific options. + * + * @param pThread IThread pointer for callbacks. + * @param params Extended options for the thread. + * @return IThreadHandle pointer (must be released). + */ + virtual IThreadHandle *MakeThread(IThread *pThread, const ThreadParams *params) =0; + + /** + * @brief Returns the priority bounds. + * Note: On Linux, the min and max are both Thread_Normal. + * + * @param max Stores the maximum priority level. + * @param min Stores the minimum priority level. + */ + virtual void GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) =0; + }; + + /** + * @brief Describes a simple locking mutex. + */ + class IMutex + { + public: + virtual ~IMutex() { }; + public: + /** + * @brief Attempts to lock, but returns instantly. + * + * @return True if lock was obtained, false otherwise. + */ + virtual bool TryLock() =0; + + /** + * @brief Attempts to lock by waiting for release. + */ + virtual void Lock() =0; + + /** + * @brief Unlocks the mutex. + */ + virtual void Unlock() =0; + + /** + * @brief Destroys the mutex handle. + */ + virtual void DestroyThis() =0; + }; + + /** + * @brief Describes a simple "condition variable"/signal lock. + */ + class IEventSignal + { + public: + virtual ~IEventSignal() { }; + public: + /** + * @brief Waits for a signal. + */ + virtual void Wait() =0; + + /** + * @brief Triggers the signal and resets the signal after triggering. + */ + virtual void Signal() =0; + + /** + * @brief Frees the signal handle. + */ + virtual void DestroyThis() =0; + }; + + /** + * @brief Describes possible worker states + */ + enum WorkerState + { + Worker_Invalid = -3, + Worker_Stopped = -2, + Worker_Paused = -1, + Worker_Running, + }; + + /** + * @brief This is a "worker pool." A single thread places tasks in a queue. + * Each IThread is then a task, rather than its own separate thread. + */ + class IThreadWorker : public IThreadCreator + { + public: + virtual ~IThreadWorker() + { + }; + public: + /** + * @brief Runs one "frame" of the worker. + * + * @return Number of tasks processed. + */ + virtual unsigned int RunFrame() =0; + public: + /** + * @brief Pauses the worker. + * + * @return True on success, false otherwise. + */ + virtual bool Pause() =0; + + /** + * @brief Unpauses the worker. + * + * @return True on success, false otherwise. + */ + virtual bool Unpause() =0; + + /** + * @brief Starts the worker thread. + * + * @return True on success, false otherwise. + */ + virtual bool Start() =0; + + /** + * @brief Stops the worker thread. + * + * @param flush If true, all remaining tasks will be cancelled. + * Otherwise, the threader will wait until the queue is empty. + * @return True on success, false otherwise. + */ + virtual bool Stop(bool flush) =0; + + /** + * @brief Returns the status of the worker. + * + * @param numThreads Pointer to store number of threads in the queue. + * @return State of the worker. + */ + virtual WorkerState GetStatus(unsigned int *numThreads) =0; + }; + + /** + * @brief Describes a threading system + */ + class IThreader : public SMInterface, public IThreadCreator + { + public: + virtual const char *GetInterfaceName() + { + return SMINTERFACE_THREADER_NAME; + } + virtual unsigned int GetInterfaceVersion() + { + return SMINTERFACE_THREADER_VERSION; + } + public: + /** + * @brief Creates a mutex (mutual exclusion lock). + * + * @return A new IMutex pointer (must be destroyed). + */ + virtual IMutex *MakeMutex() =0; + + /** + * @brief Sleeps the calling thread for a number of milliseconds. + * + * @param ms Millisecond count to sleep. + */ + virtual void ThreadSleep(unsigned int ms) =0; + + /** + * @brief Creates a non-signalled event. + * + * @return A new IEventSignal pointer (must be destroyed). + */ + virtual IEventSignal *MakeEventSignal() =0; + + /** + * @brief Creates a thread worker. + * + * @param threaded If true, the worker will be threaded. + * If false, the worker will require manual frame execution. + * @return A new IThreadWorker pointer (must be destroyed). + */ + virtual IThreadWorker *MakeWorker(bool threaded) =0; + + /** + * @brief Destroys an IThreadWorker pointer. + * + * @param pWorker IThreadWorker pointer to destroy. + */ + virtual void DestroyWorker(IThreadWorker *pWorker) =0; + }; +}; + +#endif //_INCLUDE_SOURCEMOD_THREADER_H diff --git a/extensions/threader/thread/PosixThreads.cpp b/extensions/threader/thread/PosixThreads.cpp new file mode 100644 index 00000000..f08b9210 --- /dev/null +++ b/extensions/threader/thread/PosixThreads.cpp @@ -0,0 +1,263 @@ +#include +#include "PosixThreads.h" + +void PosixThreader::ThreadSleep(unsigned int ms) +{ + usleep( ms * 1000 ); +} + +void PosixThreader::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) +{ + max = ThreadPrio_Normal; + min = ThreadPrio_Normal; +} + +IMutex *PosixThreader::MakeMutex() +{ + pthread_mutex_t mutex; + + if (pthread_mutex_init(&mutex, NULL) != 0) + return NULL; + + PosixMutex *pMutex = new PosixMutex(mutex); + + return pMutex; +} + + +void PosixThreader::MakeThread(IThread *pThread) +{ + ThreadParams defparams; + + defparams.flags = Thread_AutoRelease; + defparams.prio = ThreadPrio_Normal; + + MakeThread(pThread, &defparams); +} + +IThreadHandle *PosixThreader::MakeThread(IThread *pThread, ThreadFlags flags) +{ + ThreadParams defparams; + + defparams.flags = flags; + defparams.prio = ThreadPrio_Normal; + + return MakeThread(pThread, &defparams); +} + +void *Posix_ThreadGate(void *param) +{ + PosixThreader::ThreadHandle *pHandle = + reinterpret_cast(param); + + //Block this thread from being started initially. + pthread_mutex_lock(&pHandle->m_runlock); + //if we get here, we've obtained the lock and are allowed to run. + //unlock and continue. + pthread_mutex_unlock(&pHandle->m_runlock); + + pHandle->m_run->RunThread(pHandle); + + ThreadParams params; + pthread_mutex_lock(&pHandle->m_statelock); + pHandle->m_state = Thread_Done; + pHandle->GetParams(¶ms); + pthread_mutex_unlock(&pHandle->m_statelock); + + pHandle->m_run->OnTerminate(pHandle, false); + if (params.flags & Thread_AutoRelease) + delete pHandle; + + return 0; +} + +ThreadParams g_defparams; +IThreadHandle *PosixThreader::MakeThread(IThread *pThread, const ThreadParams *params) +{ + if (params == NULL) + params = &g_defparams; + + PosixThreader::ThreadHandle *pHandle = + new PosixThreader::ThreadHandle(this, pThread, params); + + pthread_mutex_lock(&pHandle->m_runlock); + + int err; + err = pthread_create(&pHandle->m_thread, NULL, Posix_ThreadGate, (void *)pHandle); + + if (err != 0) + { + pthread_mutex_unlock(&pHandle->m_runlock); + delete pHandle; + return NULL; + } + + //Don't bother setting priority... + + if (!(pHandle->m_params.flags & Thread_CreateSuspended)) + { + pHandle->m_state = Thread_Running; + err = pthread_mutex_unlock(&pHandle->m_runlock); + if (err != 0) + pHandle->m_state = Thread_Paused; + } + + return pHandle; +} + +IEventSignal *PosixThreader::MakeEventSignal() +{ + return new PosixEventSignal(); +} + +/***************** +**** Mutexes **** +*****************/ + +PosixThreader::PosixMutex::~PosixMutex() +{ + pthread_mutex_destroy(&m_mutex); +} + +bool PosixThreader::PosixMutex::TryLock() +{ + int err = pthread_mutex_trylock(&m_mutex); + + return (err == 0); +} + +void PosixThreader::PosixMutex::Lock() +{ + pthread_mutex_lock(&m_mutex); +} + +void PosixThreader::PosixMutex::Unlock() +{ + pthread_mutex_unlock(&m_mutex); +} + +void PosixThreader::PosixMutex::DestroyThis() +{ + delete this; +} + +/****************** +* Thread Handles * +******************/ + +PosixThreader::ThreadHandle::ThreadHandle(IThreader *parent, IThread *run, const ThreadParams *params) : + m_parent(parent), m_params(*params), m_run(run), m_state(Thread_Paused) +{ + pthread_mutex_init(&m_runlock, NULL); + pthread_mutex_init(&m_statelock, NULL); +} + +PosixThreader::ThreadHandle::~ThreadHandle() +{ + pthread_mutex_destroy(&m_runlock); + pthread_mutex_destroy(&m_statelock); +} + +bool PosixThreader::ThreadHandle::WaitForThread() +{ + void *arg; + + if (pthread_join(m_thread, &arg) != 0) + return false; + + return true; +} + +ThreadState PosixThreader::ThreadHandle::GetState() +{ + ThreadState state; + + pthread_mutex_lock(&m_statelock); + state = m_state; + pthread_mutex_unlock(&m_statelock); + + return state; +} + +IThreadCreator *PosixThreader::ThreadHandle::Parent() +{ + return m_parent; +} + +void PosixThreader::ThreadHandle::DestroyThis() +{ + if (m_params.flags & Thread_AutoRelease) + return; + + delete this; +} + +void PosixThreader::ThreadHandle::GetParams(ThreadParams *ptparams) +{ + if (!ptparams) + return; + + *ptparams = m_params; +} + +ThreadPriority PosixThreader::ThreadHandle::GetPriority() +{ + return ThreadPrio_Normal; +} + +bool PosixThreader::ThreadHandle::SetPriority(ThreadPriority prio) +{ + return (prio == ThreadPrio_Normal); +} + +bool PosixThreader::ThreadHandle::Unpause() +{ + if (m_state != Thread_Paused) + return false; + + m_state = Thread_Running; + + if (pthread_mutex_unlock(&m_runlock) != 0) + { + m_state = Thread_Paused; + return false; + } + + return true; +} + +/***************** + * EVENT SIGNALS * + *****************/ + +PosixThreader::PosixEventSignal::PosixEventSignal() +{ + pthread_cond_init(&m_cond, NULL); + pthread_mutex_init(&m_mutex, NULL); +} + +PosixThreader::PosixEventSignal::~PosixEventSignal() +{ + pthread_cond_destroy(&m_cond); + pthread_mutex_destroy(&m_mutex); +} + +void PosixThreader::PosixEventSignal::Wait() +{ + pthread_mutex_lock(&m_mutex); + pthread_cond_wait(&m_cond, &m_mutex); + pthread_mutex_unlock(&m_mutex); +} + +void PosixThreader::PosixEventSignal::Signal() +{ + pthread_mutex_lock(&m_mutex); + pthread_cond_broadcast(&m_cond); + pthread_mutex_unlock(&m_mutex); +} + +void PosixThreader::PosixEventSignal::DestroyThis() +{ + delete this; +} + diff --git a/extensions/threader/thread/PosixThreads.h b/extensions/threader/thread/PosixThreads.h new file mode 100644 index 00000000..30a5ee75 --- /dev/null +++ b/extensions/threader/thread/PosixThreads.h @@ -0,0 +1,82 @@ +#ifndef _INCLUDE_POSIXTHREADS_H_ +#define _INCLUDE_POSIXTHREADS_H_ + +#include +#include "IThreader.h" + +using namespace SourceMod; + +void *Posix_ThreadGate(void *param); + +class PosixThreader : public IThreader +{ +public: + class ThreadHandle : public IThreadHandle + { + friend class PosixThreader; + friend void *Posix_ThreadGate(void *param); + public: + ThreadHandle(IThreader *parent, IThread *run, const ThreadParams *params); + virtual ~ThreadHandle(); + public: + virtual bool WaitForThread(); + virtual void DestroyThis(); + virtual IThreadCreator *Parent(); + virtual void GetParams(ThreadParams *ptparams); + virtual ThreadPriority GetPriority(); + virtual bool SetPriority(ThreadPriority prio); + virtual ThreadState GetState(); + virtual bool Unpause(); + protected: + IThreader *m_parent; //Parent handle + pthread_t m_thread; //Windows HANDLE + ThreadParams m_params; //Current Parameters + IThread *m_run; //Runnable context + pthread_mutex_t m_statelock; + pthread_mutex_t m_runlock; + ThreadState m_state; //internal state + }; + class PosixMutex : public IMutex + { + public: + PosixMutex(pthread_mutex_t m) : m_mutex(m) + { + }; + virtual ~PosixMutex(); + public: + virtual bool TryLock(); + virtual void Lock(); + virtual void Unlock(); + virtual void DestroyThis(); + protected: + pthread_mutex_t m_mutex; + }; + class PosixEventSignal : public IEventSignal + { + public: + PosixEventSignal(); + virtual ~PosixEventSignal(); + public: + virtual void Wait(); + virtual void Signal(); + virtual void DestroyThis(); + protected: + pthread_cond_t m_cond; + pthread_mutex_t m_mutex; + }; +public: + virtual IMutex *MakeMutex(); + virtual void MakeThread(IThread *pThread); + virtual IThreadHandle *MakeThread(IThread *pThread, ThreadFlags flags); + virtual IThreadHandle *MakeThread(IThread *pThread, const ThreadParams *params); + virtual void GetPriorityBounds(ThreadPriority &max, ThreadPriority &min); + virtual void ThreadSleep(unsigned int ms); + virtual IEventSignal *MakeEventSignal(); +}; + +#if defined SM_DEFAULT_THREADER && !defined SM_MAIN_THREADER +#define SM_MAIN_THREADER PosixThreader; +typedef class PosixThreader MainThreader; +#endif + +#endif //_INCLUDE_POSIXTHREADS_H_ diff --git a/extensions/threader/thread/ThreadSupport.h b/extensions/threader/thread/ThreadSupport.h new file mode 100644 index 00000000..f948e6ce --- /dev/null +++ b/extensions/threader/thread/ThreadSupport.h @@ -0,0 +1,10 @@ +#ifndef _INCLUDE_SOURCEMOD_THREAD_SUPPORT_H +#define _INCLUDE_SOURCEMOD_THREAD_SUPPORT_H + +#if defined __linux__ +#include "PosixThreads.h" +#elif defined WIN32 +#include "WinThreads.h" +#endif + +#endif //_INCLUDE_SOURCEMOD_THREAD_SUPPORT_H diff --git a/extensions/threader/thread/ThreadWorker.cpp b/extensions/threader/thread/ThreadWorker.cpp new file mode 100644 index 00000000..fb298185 --- /dev/null +++ b/extensions/threader/thread/ThreadWorker.cpp @@ -0,0 +1,258 @@ +#include "ThreadWorker.h" + +ThreadWorker::ThreadWorker() : + m_Threader(NULL), + m_QueueLock(NULL), + m_StateLock(NULL), + m_PauseSignal(NULL), + m_AddSignal(NULL), + me(NULL), + m_think_time(DEFAULT_THINK_TIME_MS) +{ + m_state = Worker_Invalid; +} + +ThreadWorker::ThreadWorker(IThreader *pThreader, unsigned int thinktime) : + m_Threader(pThreader), + m_QueueLock(NULL), + m_StateLock(NULL), + m_PauseSignal(NULL), + m_AddSignal(NULL), + me(NULL), + m_think_time(thinktime) +{ + if (m_Threader) + { + m_state = Worker_Stopped; + } else { + m_state = Worker_Invalid; + } +} + +ThreadWorker::~ThreadWorker() +{ + if (m_state != Worker_Stopped || m_state != Worker_Invalid) + Stop(true); + + if (m_ThreadQueue.size()) + Flush(true); +} + +void ThreadWorker::OnTerminate(IThreadHandle *pHandle, bool cancel) +{ + //we don't particularly care + return; +} + +void ThreadWorker::RunThread(IThreadHandle *pHandle) +{ + WorkerState this_state = Worker_Running; + size_t num; + + while (true) + { + /** + * Check number of items in the queue + */ + m_StateLock->Lock(); + this_state = m_state; + m_StateLock->Unlock(); + if (this_state != Worker_Stopped) + { + m_QueueLock->Lock(); + num = m_ThreadQueue.size(); + if (!num) + { + /** + * if none, wait for an item + */ + m_Waiting = true; + m_QueueLock->Unlock(); + /* first check if we should end again */ + if (this_state == Worker_Stopped) + { + break; + } + m_AddSignal->Wait(); + m_Waiting = false; + } else { + m_QueueLock->Unlock(); + } + } + m_StateLock->Lock(); + this_state = m_state; + m_StateLock->Unlock(); + if (this_state != Worker_Running) + { + if (this_state == Worker_Paused || this_state == Worker_Stopped) + { + //wait until the lock is cleared. + if (this_state == Worker_Paused) + { + m_PauseSignal->Wait(); + } + if (this_state == Worker_Stopped) + { + //if we're supposed to flush cleanrly, + // run all of the remaining frames first. + if (!m_FlushType) + { + while (m_ThreadQueue.size()) + RunFrame(); + } + break; + } + } + } + /** + * Run the frame. + */ + RunFrame(); + + /** + * wait in between threads if specified + */ + if (m_think_time) + m_Threader->ThreadSleep(m_think_time); + } +} + +SWThreadHandle *ThreadWorker::PopThreadFromQueue() +{ + if (m_state <= Worker_Stopped && !m_QueueLock) + return NULL; + + SWThreadHandle *swt; + m_QueueLock->Lock(); + swt = BaseWorker::PopThreadFromQueue(); + m_QueueLock->Unlock(); + + return swt; +} + +void ThreadWorker::AddThreadToQueue(SWThreadHandle *pHandle) +{ + if (m_state <= Worker_Stopped) + return; + + m_QueueLock->Lock(); + BaseWorker::AddThreadToQueue(pHandle); + if (m_Waiting) + { + m_AddSignal->Signal(); + } + m_QueueLock->Unlock(); +} + +WorkerState ThreadWorker::GetStatus(unsigned int *threads) +{ + WorkerState state; + + m_StateLock->Lock(); + state = BaseWorker::GetStatus(threads); + m_StateLock->Unlock(); + + return state; +} + +bool ThreadWorker::Start() +{ + if (m_state == Worker_Invalid) + { + if (m_Threader == NULL) + return false; + } else if (m_state != Worker_Stopped) { + return false; + } + + m_Waiting = false; + m_QueueLock = m_Threader->MakeMutex(); + m_StateLock = m_Threader->MakeMutex(); + m_PauseSignal = m_Threader->MakeEventSignal(); + m_AddSignal = m_Threader->MakeEventSignal(); + m_state = Worker_Running; + ThreadParams pt; + pt.flags = Thread_Default; + pt.prio = ThreadPrio_Normal; + me = m_Threader->MakeThread(this, &pt); + + return true; +} + +bool ThreadWorker::Stop(bool flush_cancel) +{ + if (m_state == Worker_Invalid || m_state == Worker_Stopped) + return false; + + WorkerState oldstate; + + //set new state + m_StateLock->Lock(); + oldstate = m_state; + m_state = Worker_Stopped; + m_FlushType = flush_cancel; + m_StateLock->Unlock(); + + if (oldstate == Worker_Paused) + { + Unpause(); + } else { + m_QueueLock->Lock(); + if (m_Waiting) + { + m_AddSignal->Signal(); + } + m_QueueLock->Unlock(); + } + + me->WaitForThread(); + //destroy it + me->DestroyThis(); + //flush all remaining events + Flush(true); + + //free mutex locks + m_QueueLock->DestroyThis(); + m_StateLock->DestroyThis(); + m_PauseSignal->DestroyThis(); + m_AddSignal->DestroyThis(); + + //invalidizzle + m_QueueLock = NULL; + m_StateLock = NULL; + m_PauseSignal = NULL; + m_AddSignal = NULL; + me = NULL; + + return true; +} + +bool ThreadWorker::Pause() +{ + if (m_state != Worker_Running) + return false; + + m_StateLock->Lock(); + m_state = Worker_Paused; + m_StateLock->Unlock(); + + return true; +} + + +bool ThreadWorker::Unpause() +{ + if (m_state != Worker_Paused) + return false; + + m_StateLock->Lock(); + m_state = Worker_Running; + m_StateLock->Unlock(); + m_PauseSignal->Signal(); + if (m_Waiting) + { + m_AddSignal->Signal(); + } + + return true; +} diff --git a/extensions/threader/thread/ThreadWorker.h b/extensions/threader/thread/ThreadWorker.h new file mode 100644 index 00000000..6afd64f4 --- /dev/null +++ b/extensions/threader/thread/ThreadWorker.h @@ -0,0 +1,40 @@ +#ifndef _INCLUDE_SOURCEMOD_THREADWORKER_H +#define _INCLUDE_SOURCEMOD_THREADWORKER_H + +#include "BaseWorker.h" + +#define DEFAULT_THINK_TIME_MS 500 + +class ThreadWorker : public BaseWorker, public IThread +{ +public: + ThreadWorker(); + ThreadWorker(IThreader *pThreader, unsigned int thinktime=DEFAULT_THINK_TIME_MS); + virtual ~ThreadWorker(); +public: //IThread + virtual void OnTerminate(IThreadHandle *pHandle, bool cancel); + virtual void RunThread(IThreadHandle *pHandle); +public: //IWorker + //Controls the worker + virtual bool Pause(); + virtual bool Unpause(); + virtual bool Start(); + virtual bool Stop(bool flush_cancel); + //returns status and number of threads in queue + virtual WorkerState GetStatus(unsigned int *numThreads); +public: //BaseWorker + virtual void AddThreadToQueue(SWThreadHandle *pHandle); + virtual SWThreadHandle *PopThreadFromQueue(); +protected: + IThreader *m_Threader; + IMutex *m_QueueLock; + IMutex *m_StateLock; + IEventSignal *m_PauseSignal; + IEventSignal *m_AddSignal; + IThreadHandle *me; + unsigned int m_think_time; + volatile bool m_Waiting; + volatile bool m_FlushType; +}; + +#endif //_INCLUDE_SOURCEMOD_THREADWORKER_H diff --git a/extensions/threader/thread/WinThreads.cpp b/extensions/threader/thread/WinThreads.cpp new file mode 100644 index 00000000..be444a69 --- /dev/null +++ b/extensions/threader/thread/WinThreads.cpp @@ -0,0 +1,305 @@ +#include "WinThreads.h" +#include "ThreadWorker.h" + +IThreadWorker *WinThreader::MakeWorker(bool threaded) +{ + if (threaded) + { + return new ThreadWorker(this, DEFAULT_THINK_TIME_MS); + } else { + return new BaseWorker(); + } +} + +void WinThreader::DestroyWorker(IThreadWorker *pWorker) +{ + delete pWorker; +} + +void WinThreader::ThreadSleep(unsigned int ms) +{ + Sleep((DWORD)ms); +} + +IMutex *WinThreader::MakeMutex() +{ + HANDLE mutex = CreateMutexA(NULL, FALSE, NULL); + + if (mutex == NULL) + return NULL; + + WinMutex *pMutex = new WinMutex(mutex); + + return pMutex; +} + +IThreadHandle *WinThreader::MakeThread(IThread *pThread, ThreadFlags flags) +{ + ThreadParams defparams; + + defparams.flags = flags; + defparams.prio = ThreadPrio_Normal; + + return MakeThread(pThread, &defparams); +} + +void WinThreader::MakeThread(IThread *pThread) +{ + ThreadParams defparams; + + defparams.flags = Thread_AutoRelease; + defparams.prio = ThreadPrio_Normal; + + MakeThread(pThread, &defparams); +} + +DWORD WINAPI Win32_ThreadGate(LPVOID param) +{ + WinThreader::ThreadHandle *pHandle = + reinterpret_cast(param); + + pHandle->m_run->RunThread(pHandle); + + ThreadParams params; + EnterCriticalSection(&pHandle->m_crit); + pHandle->m_state = Thread_Done; + pHandle->GetParams(¶ms); + LeaveCriticalSection(&pHandle->m_crit); + + pHandle->m_run->OnTerminate(pHandle, false); + if (params.flags & Thread_AutoRelease) + delete pHandle; + + return 0; +} + +void WinThreader::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) +{ + max = ThreadPrio_Maximum; + min = ThreadPrio_Minimum; +} + +ThreadParams g_defparams; +IThreadHandle *WinThreader::MakeThread(IThread *pThread, const ThreadParams *params) +{ + if (params == NULL) + params = &g_defparams; + + WinThreader::ThreadHandle *pHandle = + new WinThreader::ThreadHandle(this, NULL, pThread, params); + + DWORD tid; + pHandle->m_thread = + CreateThread(NULL, 0, &Win32_ThreadGate, (LPVOID)pHandle, CREATE_SUSPENDED, &tid); + + if (!pHandle->m_thread) + { + delete pHandle; + return NULL; + } + + if (pHandle->m_params.prio != ThreadPrio_Normal) + { + pHandle->SetPriority(pHandle->m_params.prio); + } + + if (!(pHandle->m_params.flags & Thread_CreateSuspended)) + { + pHandle->Unpause(); + } + + return pHandle; +} + +IEventSignal *WinThreader::MakeEventSignal() +{ + HANDLE event = CreateEventA(NULL, FALSE, FALSE, NULL); + + if (!event) + return NULL; + + WinEvent *pEvent = new WinEvent(event); + + return pEvent; +} + +/***************** + **** Mutexes **** + *****************/ + +WinThreader::WinMutex::~WinMutex() +{ + if (m_mutex) + { + CloseHandle(m_mutex); + m_mutex = NULL; + } +} + +bool WinThreader::WinMutex::TryLock() +{ + if (!m_mutex) + return false; + + if (WaitForSingleObject(m_mutex, 0) != WAIT_FAILED) + return true; + + return false; +} + +void WinThreader::WinMutex::Lock() +{ + if (!m_mutex) + return; + + WaitForSingleObject(m_mutex, INFINITE); +} + +void WinThreader::WinMutex::Unlock() +{ + if (!m_mutex) + return; + + ReleaseMutex(m_mutex); +} + +void WinThreader::WinMutex::DestroyThis() +{ + delete this; +} + +/****************** + * Thread Handles * + ******************/ + +WinThreader::ThreadHandle::ThreadHandle(IThreader *parent, HANDLE hthread, IThread *run, const ThreadParams *params) : + m_parent(parent), m_thread(hthread), m_run(run), m_params(*params), + m_state(Thread_Paused) +{ + InitializeCriticalSection(&m_crit); +} + +WinThreader::ThreadHandle::~ThreadHandle() +{ + if (m_thread) + { + CloseHandle(m_thread); + m_thread = NULL; + } + DeleteCriticalSection(&m_crit); +} + +bool WinThreader::ThreadHandle::WaitForThread() +{ + if (m_thread == NULL) + return false; + + if (WaitForSingleObject(m_thread, INFINITE) != 0) + return false; + + return true; +} + +ThreadState WinThreader::ThreadHandle::GetState() +{ + ThreadState state; + + EnterCriticalSection(&m_crit); + state = m_state; + LeaveCriticalSection(&m_crit); + + return state; +} + +IThreadCreator *WinThreader::ThreadHandle::Parent() +{ + return m_parent; +} + +void WinThreader::ThreadHandle::DestroyThis() +{ + if (m_params.flags & Thread_AutoRelease) + return; + + delete this; +} + +void WinThreader::ThreadHandle::GetParams(ThreadParams *ptparams) +{ + if (!ptparams) + return; + + *ptparams = m_params; +} + +ThreadPriority WinThreader::ThreadHandle::GetPriority() +{ + return m_params.prio; +} + +bool WinThreader::ThreadHandle::SetPriority(ThreadPriority prio) +{ + if (!m_thread) + return false; + + BOOL res = FALSE; + + if (prio >= ThreadPrio_Maximum) + res = SetThreadPriority(m_thread, THREAD_PRIORITY_HIGHEST); + else if (prio <= ThreadPrio_Minimum) + res = SetThreadPriority(m_thread, THREAD_PRIORITY_LOWEST); + else if (prio == ThreadPrio_Normal) + res = SetThreadPriority(m_thread, THREAD_PRIORITY_NORMAL); + else if (prio == ThreadPrio_High) + res = SetThreadPriority(m_thread, THREAD_PRIORITY_ABOVE_NORMAL); + else if (prio == ThreadPrio_Low) + res = SetThreadPriority(m_thread, THREAD_PRIORITY_BELOW_NORMAL); + + m_params.prio = prio; + + return (res != FALSE); +} + +bool WinThreader::ThreadHandle::Unpause() +{ + if (!m_thread) + return false; + + if (m_state != Thread_Paused) + return false; + + m_state = Thread_Running; + + if (ResumeThread(m_thread) == -1) + { + m_state = Thread_Paused; + return false; + } + + return true; +} + +/***************** + * EVENT SIGNALS * + *****************/ + +WinThreader::WinEvent::~WinEvent() +{ + CloseHandle(m_event); +} + +void WinThreader::WinEvent::Wait() +{ + WaitForSingleObject(m_event, INFINITE); +} + +void WinThreader::WinEvent::Signal() +{ + SetEvent(m_event); +} + +void WinThreader::WinEvent::DestroyThis() +{ + delete this; +} + diff --git a/extensions/threader/thread/WinThreads.h b/extensions/threader/thread/WinThreads.h new file mode 100644 index 00000000..89a640fc --- /dev/null +++ b/extensions/threader/thread/WinThreads.h @@ -0,0 +1,84 @@ +#ifndef _INCLUDE_WINTHREADS_H_ +#define _INCLUDE_WINTHREADS_H_ + +#include +#include "IThreader.h" + +using namespace SourceMod; + +DWORD WINAPI Win32_ThreadGate(LPVOID param); + +class WinThreader : public IThreader +{ +public: + class ThreadHandle : public IThreadHandle + { + friend class WinThreader; + friend DWORD WINAPI Win32_ThreadGate(LPVOID param); + public: + ThreadHandle(IThreader *parent, HANDLE hthread, IThread *run, const ThreadParams *params); + virtual ~ThreadHandle(); + public: + virtual bool WaitForThread(); + virtual void DestroyThis(); + virtual IThreadCreator *Parent(); + virtual void GetParams(ThreadParams *ptparams); + virtual ThreadPriority GetPriority(); + virtual bool SetPriority(ThreadPriority prio); + virtual ThreadState GetState(); + virtual bool Unpause(); + protected: + IThreader *m_parent; //Parent handle + HANDLE m_thread; //Windows HANDLE + ThreadParams m_params; //Current Parameters + IThread *m_run; //Runnable context + ThreadState m_state; //internal state + CRITICAL_SECTION m_crit; + }; + class WinMutex : public IMutex + { + public: + WinMutex(HANDLE mutex) : m_mutex(mutex) + { + }; + virtual ~WinMutex(); + public: + virtual bool TryLock(); + virtual void Lock(); + virtual void Unlock(); + virtual void DestroyThis(); + protected: + HANDLE m_mutex; + }; + class WinEvent : public IEventSignal + { + public: + WinEvent(HANDLE event) : m_event(event) + { + }; + virtual ~WinEvent(); + public: + virtual void Wait(); + virtual void Signal(); + virtual void DestroyThis(); + public: + HANDLE m_event; + }; +public: + IMutex *MakeMutex(); + void MakeThread(IThread *pThread); + IThreadHandle *MakeThread(IThread *pThread, ThreadFlags flags); + IThreadHandle *MakeThread(IThread *pThread, const ThreadParams *params); + void GetPriorityBounds(ThreadPriority &max, ThreadPriority &min); + void ThreadSleep(unsigned int ms); + IEventSignal *MakeEventSignal(); + IThreadWorker *MakeWorker(bool threaded); + void DestroyWorker(IThreadWorker *pWorker); +}; + +#if defined SM_DEFAULT_THREADER && !defined SM_MAIN_THREADER +#define SM_MAIN_THREADER WinThreader; +typedef class WinThreader MainThreader; +#endif + +#endif //_INCLUDE_WINTHREADS_H_