Rewrite DBManager to use the new synchronization primitives (bug 5870, r=fyren).

This commit is contained in:
David Anderson 2013-08-21 00:01:28 -07:00
parent b972ea4781
commit 754a6e1177
3 changed files with 87 additions and 92 deletions

View File

@ -28,6 +28,10 @@ for i in SM.sdkInfo:
binary = Cpp.LibraryBuilder(name, AMBuild, extension, compiler) binary = Cpp.LibraryBuilder(name, AMBuild, extension, compiler)
SM.PreSetupHL2Job(extension, binary, i) SM.PreSetupHL2Job(extension, binary, i)
if AMBuild.target['platform'] == 'linux':
compiler['POSTLINKFLAGS'].append('-lpthread')
compiler['POSTLINKFLAGS'].append('-lrt')
if i in ['csgo', 'dota']: if i in ['csgo', 'dota']:
if AMBuild.target['platform'] == 'linux': if AMBuild.target['platform'] == 'linux':
link = os.path.join(os.path.join(AMBuild.outputFolder, extension.workFolder), 'libprotobuf.a') link = os.path.join(os.path.join(AMBuild.outputFolder, extension.workFolder), 'libprotobuf.a')

View File

@ -1,5 +1,5 @@
/** /**
* vim: set ts=4 : * vim: set ts=4 sw=4 tw=99 noet :
* ============================================================================= * =============================================================================
* SourceMod * SourceMod
* Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved. * Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved.
@ -45,7 +45,10 @@ DBManager g_DBMan;
static bool s_OneTimeThreaderErrorMsg = false; static bool s_OneTimeThreaderErrorMsg = false;
DBManager::DBManager() DBManager::DBManager()
: m_ParseLevel(0), m_ParseState(0), m_pDefault(NULL) : m_Terminate(false),
m_ParseLevel(0),
m_ParseState(0),
m_pDefault(NULL)
{ {
} }
@ -64,10 +67,6 @@ void DBManager::OnSourceModAllInitialized()
g_SourceMod.BuildPath(Path_SM, m_Filename, sizeof(m_Filename), "configs/databases.cfg"); g_SourceMod.BuildPath(Path_SM, m_Filename, sizeof(m_Filename), "configs/databases.cfg");
m_pConfigLock = g_pThreader->MakeMutex();
m_pThinkLock = g_pThreader->MakeMutex();
m_pQueueLock = g_pThreader->MakeMutex();
scripts->AddPluginsListener(this); scripts->AddPluginsListener(this);
} }
@ -80,7 +79,7 @@ void DBManager::OnSourceModLevelChange(const char *mapName)
* This way the thread's search won't be searching through a * This way the thread's search won't be searching through a
* potentially empty/corrupt list, which would be very bad. * potentially empty/corrupt list, which would be very bad.
*/ */
m_pConfigLock->Lock(); ke::AutoLock lock(&m_ConfigLock);
if ((err = textparsers->ParseFile_SMC(m_Filename, this, &states)) != SMCError_Okay) if ((err = textparsers->ParseFile_SMC(m_Filename, this, &states)) != SMCError_Okay)
{ {
g_Logger.LogError("[SM] Detected parse error(s) in file \"%s\"", m_Filename); g_Logger.LogError("[SM] Detected parse error(s) in file \"%s\"", m_Filename);
@ -90,16 +89,12 @@ void DBManager::OnSourceModLevelChange(const char *mapName)
g_Logger.LogError("[SM] Line %d: %s", states.line, txt); g_Logger.LogError("[SM] Line %d: %s", states.line, txt);
} }
} }
m_pConfigLock->Unlock();
} }
void DBManager::OnSourceModShutdown() void DBManager::OnSourceModShutdown()
{ {
KillWorkerThread(); KillWorkerThread();
scripts->RemovePluginsListener(this); scripts->RemovePluginsListener(this);
m_pConfigLock->DestroyThis();
m_pThinkLock->DestroyThis();
m_pQueueLock->DestroyThis();
handlesys->RemoveType(m_DatabaseType, g_pCoreIdent); handlesys->RemoveType(m_DatabaseType, g_pCoreIdent);
handlesys->RemoveType(m_DriverType, g_pCoreIdent); handlesys->RemoveType(m_DriverType, g_pCoreIdent);
ClearConfigs(); ClearConfigs();
@ -502,12 +497,17 @@ IDBDriver *DBManager::FindOrLoadDriver(const char *name)
void DBManager::KillWorkerThread() void DBManager::KillWorkerThread()
{ {
if (m_pWorker) if (m_Worker)
{ {
m_pWorker->Stop(false); {
g_pThreader->DestroyWorker(m_pWorker); ke::AutoLock lock(&m_QueueEvent);
m_pWorker = NULL; m_Terminate = true;
m_QueueEvent.Notify();
}
m_Worker->Join();
m_Worker = NULL;
s_OneTimeThreaderErrorMsg = false; s_OneTimeThreaderErrorMsg = false;
m_Terminate = false;
} }
} }
@ -520,99 +520,95 @@ bool DBManager::AddToThreadQueue(IDBThreadOperation *op, PrioQueueLevel prio)
return false; return false;
} }
if (!m_pWorker) if (!m_Worker)
{ {
m_pWorker = g_pThreader->MakeWorker(this, true); m_Worker = new ke::Thread(this, "SM SQL Worker");
if (!m_pWorker) if (!m_Worker->Succeeded())
{ {
if (!s_OneTimeThreaderErrorMsg) if (!s_OneTimeThreaderErrorMsg)
{ {
g_Logger.LogError("[SM] Unable to create db threader (error unknown)"); g_Logger.LogError("[SM] Unable to create db threader (error unknown)");
s_OneTimeThreaderErrorMsg = true; s_OneTimeThreaderErrorMsg = true;
} }
return false; m_Worker = NULL;
}
if (!m_pWorker->Start())
{
if (!s_OneTimeThreaderErrorMsg)
{
g_Logger.LogError("[SM] Unable to start db threader (error unknown)");
s_OneTimeThreaderErrorMsg = true;
}
g_pThreader->DestroyWorker(m_pWorker);
m_pWorker = NULL;
return false; return false;
} }
} }
/* Add to the queue */ /* Add to the queue */
{ {
m_pQueueLock->Lock(); ke::AutoLock lock(&m_QueueEvent);
Queue<IDBThreadOperation *> &queue = m_OpQueue.GetQueue(prio); Queue<IDBThreadOperation *> &queue = m_OpQueue.GetQueue(prio);
queue.push(op); queue.push(op);
m_pQueueLock->Unlock(); m_QueueEvent.Notify();
} }
/* Make the thread */
m_pWorker->MakeThread(this);
return true; return true;
} }
void DBManager::OnWorkerStart(IThreadWorker *pWorker) void DBManager::Run()
{ {
m_drSafety.clear(); // Initialize DB threadsafety.
for (size_t i=0; i<m_drivers.size(); i++) for (size_t i=0; i < m_drivers.size(); i++)
{ {
if (m_drivers[i]->IsThreadSafe()) if (m_drivers[i]->IsThreadSafe())
{
m_drSafety.push_back(m_drivers[i]->InitializeThreadSafety()); m_drSafety.push_back(m_drivers[i]->InitializeThreadSafety());
} else { else
m_drSafety.push_back(false); m_drSafety.push_back(false);
}
} }
}
void DBManager::OnWorkerStop(IThreadWorker *pWorker) // Run actual worker thread logic.
{ ThreadMain();
// Shutdown DB threadsafety.
for (size_t i=0; i<m_drivers.size(); i++) for (size_t i=0; i<m_drivers.size(); i++)
{ {
if (m_drSafety[i]) if (m_drSafety[i])
{
m_drivers[i]->ShutdownThreadSafety(); m_drivers[i]->ShutdownThreadSafety();
}
} }
m_drSafety.clear(); m_drSafety.clear();
} }
void DBManager::RunThread(IThreadHandle *pThread) void DBManager::ThreadMain()
{ {
IDBThreadOperation *op = NULL; ke::AutoLock lock(&m_QueueEvent);
/* Get something from the queue */ while (true) {
{ // The lock has been acquired. Grab everything we can out of the
m_pQueueLock->Lock(); // queue. Since we want to flush the queue even if we're terminated,
Queue<IDBThreadOperation *> &queue = m_OpQueue.GetLikelyQueue(); // we process all operations we can before checking to terminate.
if (!queue.empty()) // There's no risk of starvation since the main thread blocks on us
// terminating.
while (true)
{ {
op = queue.first(); Queue<IDBThreadOperation *> &queue = m_OpQueue.GetLikelyQueue();
if (queue.empty())
break;
IDBThreadOperation *op = queue.first();
queue.pop(); queue.pop();
// Unlock the queue when we run the query, so the main thread can
// keep pumping events. We re-acquire the lock to check for more
// items. It's okay if we terminate while unlocked; the main
// thread would be blocked and we'd need to flush the queue
// anyway, so after we've depleted the queue here, we'll just
// reach the terminate at the top of the loop.
{
ke::AutoUnlock unlock(&m_QueueEvent);
op->RunThreadPart();
ke::AutoLock lock(&m_ThinkLock);
m_ThinkQueue.push(op);
}
} }
m_pQueueLock->Unlock();
if (m_Terminate)
return;
// Release the lock and wait for a signal.
m_QueueEvent.Wait();
} }
/* whoa. hi. did we get something? we should have. */
if (!op)
{
/* wtf? */
return;
}
op->RunThreadPart();
m_pThinkLock->Lock();
m_ThinkQueue.push(op);
m_pThinkLock->Unlock();
} }
void DBManager::RunFrame() void DBManager::RunFrame()
@ -624,19 +620,16 @@ void DBManager::RunFrame()
} }
/* Dump one thing per-frame so the server stays sane. */ /* Dump one thing per-frame so the server stays sane. */
m_pThinkLock->Lock(); IDBThreadOperation *op;
IDBThreadOperation *op = m_ThinkQueue.first(); {
m_ThinkQueue.pop(); ke::AutoLock lock(&m_ThinkLock);
m_pThinkLock->Unlock(); op = m_ThinkQueue.first();
m_ThinkQueue.pop();
}
op->RunThinkPart(); op->RunThinkPart();
op->Destroy(); op->Destroy();
} }
void DBManager::OnTerminate(IThreadHandle *pThread, bool cancel)
{
/* Do nothing */
}
void DBManager::OnSourceModIdentityDropped(IdentityToken_t *pToken) void DBManager::OnSourceModIdentityDropped(IdentityToken_t *pToken)
{ {
s_pAddBlock = pToken; s_pAddBlock = pToken;
@ -712,12 +705,12 @@ void DBManager::OnPluginUnloaded(IPlugin *plugin)
void DBManager::LockConfig() void DBManager::LockConfig()
{ {
m_pConfigLock->Lock(); m_ConfigLock.Lock();
} }
void DBManager::UnlockConfig() void DBManager::UnlockConfig()
{ {
m_pConfigLock->Unlock(); m_ConfigLock.Unlock();
} }
const char *DBManager::GetDefaultDriverName() const char *DBManager::GetDefaultDriverName()

View File

@ -1,5 +1,5 @@
/** /**
* vim: set ts=4 : * vim: set ts=4 sw=4 tw=99 noet :
* ============================================================================= * =============================================================================
* SourceMod * SourceMod
* Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved. * Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved.
@ -41,6 +41,7 @@
#include "sm_memtable.h" #include "sm_memtable.h"
#include <IThreader.h> #include <IThreader.h>
#include <IPluginSys.h> #include <IPluginSys.h>
#include <ke_thread_utils.h>
#include "sm_simple_prioqueue.h" #include "sm_simple_prioqueue.h"
using namespace SourceHook; using namespace SourceHook;
@ -65,9 +66,8 @@ class DBManager :
public SMGlobalClass, public SMGlobalClass,
public IHandleTypeDispatch, public IHandleTypeDispatch,
public ITextListener_SMC, public ITextListener_SMC,
public IThread, public IPluginsListener,
public IThreadWorkerCallbacks, public ke::IRunnable
public IPluginsListener
{ {
public: public:
DBManager(); DBManager();
@ -98,12 +98,9 @@ public: //ITextListener_SMC
SMCResult ReadSMC_KeyValue(const SMCStates *states, const char *key, const char *value); SMCResult ReadSMC_KeyValue(const SMCStates *states, const char *key, const char *value);
SMCResult ReadSMC_LeavingSection(const SMCStates *states); SMCResult ReadSMC_LeavingSection(const SMCStates *states);
void ReadSMC_ParseEnd(bool halted, bool failed); void ReadSMC_ParseEnd(bool halted, bool failed);
public: //IThread public: //ke::IRunnable
void RunThread(IThreadHandle *pThread); void Run();
void OnTerminate(IThreadHandle *pThread, bool cancel); void ThreadMain();
public: //IThreadWorkerCallbacks
void OnWorkerStart(IThreadWorker *pWorker);
void OnWorkerStop(IThreadWorker *pWorker);
public: //IPluginsListener public: //IPluginsListener
void OnPluginUnloaded(IPlugin *plugin); void OnPluginUnloaded(IPlugin *plugin);
public: public:
@ -129,10 +126,11 @@ private:
PrioQueue<IDBThreadOperation *> m_OpQueue; PrioQueue<IDBThreadOperation *> m_OpQueue;
Queue<IDBThreadOperation *> m_ThinkQueue; Queue<IDBThreadOperation *> m_ThinkQueue;
CVector<bool> m_drSafety; /* which drivers are safe? */ CVector<bool> m_drSafety; /* which drivers are safe? */
IThreadWorker *m_pWorker; /* Worker thread object */ ke::AutoPtr<ke::Thread> m_Worker;
IMutex *m_pConfigLock; /* Configuration lock */ ke::ConditionVariable m_QueueEvent;
IMutex *m_pQueueLock; /* Queue safety lock */ ke::Mutex m_ConfigLock;
IMutex *m_pThinkLock; /* Think-queue lock */ ke::Mutex m_ThinkLock;
bool m_Terminate;
List<ConfDbInfo *> m_confs; List<ConfDbInfo *> m_confs;
HandleType_t m_DriverType; HandleType_t m_DriverType;