From 754a6e117765882045f131d7f047d6b881ef06e3 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 21 Aug 2013 00:01:28 -0700 Subject: [PATCH] Rewrite DBManager to use the new synchronization primitives (bug 5870, r=fyren). --- core/AMBuilder | 4 ++ core/Database.cpp | 149 ++++++++++++++++++++++------------------------ core/Database.h | 26 ++++---- 3 files changed, 87 insertions(+), 92 deletions(-) diff --git a/core/AMBuilder b/core/AMBuilder index f5760941..12c2db2a 100644 --- a/core/AMBuilder +++ b/core/AMBuilder @@ -28,6 +28,10 @@ for i in SM.sdkInfo: binary = Cpp.LibraryBuilder(name, AMBuild, extension, compiler) SM.PreSetupHL2Job(extension, binary, i) + if AMBuild.target['platform'] == 'linux': + compiler['POSTLINKFLAGS'].append('-lpthread') + compiler['POSTLINKFLAGS'].append('-lrt') + if i in ['csgo', 'dota']: if AMBuild.target['platform'] == 'linux': link = os.path.join(os.path.join(AMBuild.outputFolder, extension.workFolder), 'libprotobuf.a') diff --git a/core/Database.cpp b/core/Database.cpp index fe267f1f..37bbf6e5 100644 --- a/core/Database.cpp +++ b/core/Database.cpp @@ -1,5 +1,5 @@ /** - * vim: set ts=4 : + * vim: set ts=4 sw=4 tw=99 noet : * ============================================================================= * SourceMod * Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved. @@ -45,7 +45,10 @@ DBManager g_DBMan; static bool s_OneTimeThreaderErrorMsg = false; DBManager::DBManager() -: m_ParseLevel(0), m_ParseState(0), m_pDefault(NULL) + : m_Terminate(false), + m_ParseLevel(0), + m_ParseState(0), + m_pDefault(NULL) { } @@ -64,10 +67,6 @@ void DBManager::OnSourceModAllInitialized() g_SourceMod.BuildPath(Path_SM, m_Filename, sizeof(m_Filename), "configs/databases.cfg"); - m_pConfigLock = g_pThreader->MakeMutex(); - m_pThinkLock = g_pThreader->MakeMutex(); - m_pQueueLock = g_pThreader->MakeMutex(); - scripts->AddPluginsListener(this); } @@ -80,7 +79,7 @@ void DBManager::OnSourceModLevelChange(const char *mapName) * This way the thread's search won't be searching through a * potentially empty/corrupt list, which would be very bad. */ - m_pConfigLock->Lock(); + ke::AutoLock lock(&m_ConfigLock); if ((err = textparsers->ParseFile_SMC(m_Filename, this, &states)) != SMCError_Okay) { g_Logger.LogError("[SM] Detected parse error(s) in file \"%s\"", m_Filename); @@ -90,16 +89,12 @@ void DBManager::OnSourceModLevelChange(const char *mapName) g_Logger.LogError("[SM] Line %d: %s", states.line, txt); } } - m_pConfigLock->Unlock(); } void DBManager::OnSourceModShutdown() { KillWorkerThread(); scripts->RemovePluginsListener(this); - m_pConfigLock->DestroyThis(); - m_pThinkLock->DestroyThis(); - m_pQueueLock->DestroyThis(); handlesys->RemoveType(m_DatabaseType, g_pCoreIdent); handlesys->RemoveType(m_DriverType, g_pCoreIdent); ClearConfigs(); @@ -502,12 +497,17 @@ IDBDriver *DBManager::FindOrLoadDriver(const char *name) void DBManager::KillWorkerThread() { - if (m_pWorker) + if (m_Worker) { - m_pWorker->Stop(false); - g_pThreader->DestroyWorker(m_pWorker); - m_pWorker = NULL; + { + ke::AutoLock lock(&m_QueueEvent); + m_Terminate = true; + m_QueueEvent.Notify(); + } + m_Worker->Join(); + m_Worker = NULL; s_OneTimeThreaderErrorMsg = false; + m_Terminate = false; } } @@ -520,99 +520,95 @@ bool DBManager::AddToThreadQueue(IDBThreadOperation *op, PrioQueueLevel prio) return false; } - if (!m_pWorker) + if (!m_Worker) { - m_pWorker = g_pThreader->MakeWorker(this, true); - if (!m_pWorker) + m_Worker = new ke::Thread(this, "SM SQL Worker"); + if (!m_Worker->Succeeded()) { if (!s_OneTimeThreaderErrorMsg) { g_Logger.LogError("[SM] Unable to create db threader (error unknown)"); s_OneTimeThreaderErrorMsg = true; } - return false; - } - if (!m_pWorker->Start()) - { - if (!s_OneTimeThreaderErrorMsg) - { - g_Logger.LogError("[SM] Unable to start db threader (error unknown)"); - s_OneTimeThreaderErrorMsg = true; - } - g_pThreader->DestroyWorker(m_pWorker); - m_pWorker = NULL; + m_Worker = NULL; return false; } } /* Add to the queue */ { - m_pQueueLock->Lock(); + ke::AutoLock lock(&m_QueueEvent); Queue &queue = m_OpQueue.GetQueue(prio); queue.push(op); - m_pQueueLock->Unlock(); + m_QueueEvent.Notify(); } - /* Make the thread */ - m_pWorker->MakeThread(this); - return true; } -void DBManager::OnWorkerStart(IThreadWorker *pWorker) +void DBManager::Run() { - m_drSafety.clear(); - for (size_t i=0; iIsThreadSafe()) - { m_drSafety.push_back(m_drivers[i]->InitializeThreadSafety()); - } else { + else m_drSafety.push_back(false); - } } -} -void DBManager::OnWorkerStop(IThreadWorker *pWorker) -{ + // Run actual worker thread logic. + ThreadMain(); + + // Shutdown DB threadsafety. for (size_t i=0; iShutdownThreadSafety(); - } } m_drSafety.clear(); } -void DBManager::RunThread(IThreadHandle *pThread) +void DBManager::ThreadMain() { - IDBThreadOperation *op = NULL; + ke::AutoLock lock(&m_QueueEvent); - /* Get something from the queue */ - { - m_pQueueLock->Lock(); - Queue &queue = m_OpQueue.GetLikelyQueue(); - if (!queue.empty()) + while (true) { + // The lock has been acquired. Grab everything we can out of the + // queue. Since we want to flush the queue even if we're terminated, + // we process all operations we can before checking to terminate. + // There's no risk of starvation since the main thread blocks on us + // terminating. + while (true) { - op = queue.first(); + Queue &queue = m_OpQueue.GetLikelyQueue(); + if (queue.empty()) + break; + + IDBThreadOperation *op = queue.first(); queue.pop(); + + // Unlock the queue when we run the query, so the main thread can + // keep pumping events. We re-acquire the lock to check for more + // items. It's okay if we terminate while unlocked; the main + // thread would be blocked and we'd need to flush the queue + // anyway, so after we've depleted the queue here, we'll just + // reach the terminate at the top of the loop. + { + ke::AutoUnlock unlock(&m_QueueEvent); + op->RunThreadPart(); + + ke::AutoLock lock(&m_ThinkLock); + m_ThinkQueue.push(op); + } } - m_pQueueLock->Unlock(); + + if (m_Terminate) + return; + + // Release the lock and wait for a signal. + m_QueueEvent.Wait(); } - - /* whoa. hi. did we get something? we should have. */ - if (!op) - { - /* wtf? */ - return; - } - - op->RunThreadPart(); - - m_pThinkLock->Lock(); - m_ThinkQueue.push(op); - m_pThinkLock->Unlock(); } void DBManager::RunFrame() @@ -624,19 +620,16 @@ void DBManager::RunFrame() } /* Dump one thing per-frame so the server stays sane. */ - m_pThinkLock->Lock(); - IDBThreadOperation *op = m_ThinkQueue.first(); - m_ThinkQueue.pop(); - m_pThinkLock->Unlock(); + IDBThreadOperation *op; + { + ke::AutoLock lock(&m_ThinkLock); + op = m_ThinkQueue.first(); + m_ThinkQueue.pop(); + } op->RunThinkPart(); op->Destroy(); } -void DBManager::OnTerminate(IThreadHandle *pThread, bool cancel) -{ - /* Do nothing */ -} - void DBManager::OnSourceModIdentityDropped(IdentityToken_t *pToken) { s_pAddBlock = pToken; @@ -712,12 +705,12 @@ void DBManager::OnPluginUnloaded(IPlugin *plugin) void DBManager::LockConfig() { - m_pConfigLock->Lock(); + m_ConfigLock.Lock(); } void DBManager::UnlockConfig() { - m_pConfigLock->Unlock(); + m_ConfigLock.Unlock(); } const char *DBManager::GetDefaultDriverName() diff --git a/core/Database.h b/core/Database.h index 9e2b9cf7..f330f505 100644 --- a/core/Database.h +++ b/core/Database.h @@ -1,5 +1,5 @@ /** - * vim: set ts=4 : + * vim: set ts=4 sw=4 tw=99 noet : * ============================================================================= * SourceMod * Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved. @@ -41,6 +41,7 @@ #include "sm_memtable.h" #include #include +#include #include "sm_simple_prioqueue.h" using namespace SourceHook; @@ -65,9 +66,8 @@ class DBManager : public SMGlobalClass, public IHandleTypeDispatch, public ITextListener_SMC, - public IThread, - public IThreadWorkerCallbacks, - public IPluginsListener + public IPluginsListener, + public ke::IRunnable { public: DBManager(); @@ -98,12 +98,9 @@ public: //ITextListener_SMC SMCResult ReadSMC_KeyValue(const SMCStates *states, const char *key, const char *value); SMCResult ReadSMC_LeavingSection(const SMCStates *states); void ReadSMC_ParseEnd(bool halted, bool failed); -public: //IThread - void RunThread(IThreadHandle *pThread); - void OnTerminate(IThreadHandle *pThread, bool cancel); -public: //IThreadWorkerCallbacks - void OnWorkerStart(IThreadWorker *pWorker); - void OnWorkerStop(IThreadWorker *pWorker); +public: //ke::IRunnable + void Run(); + void ThreadMain(); public: //IPluginsListener void OnPluginUnloaded(IPlugin *plugin); public: @@ -129,10 +126,11 @@ private: PrioQueue m_OpQueue; Queue m_ThinkQueue; CVector m_drSafety; /* which drivers are safe? */ - IThreadWorker *m_pWorker; /* Worker thread object */ - IMutex *m_pConfigLock; /* Configuration lock */ - IMutex *m_pQueueLock; /* Queue safety lock */ - IMutex *m_pThinkLock; /* Think-queue lock */ + ke::AutoPtr m_Worker; + ke::ConditionVariable m_QueueEvent; + ke::Mutex m_ConfigLock; + ke::Mutex m_ThinkLock; + bool m_Terminate; List m_confs; HandleType_t m_DriverType;