Replace all uses of AMTL threads with STL threads.

This also rewrites the work loop for threaded queries. It has been
simplified significantly.
This commit is contained in:
David Anderson 2020-05-14 00:10:30 -07:00
parent 15023777f4
commit a253e175bb
16 changed files with 84 additions and 98 deletions

View File

@ -34,17 +34,20 @@
#include "HandleSys.h"
#include "ExtensionSys.h"
#include "PluginSys.h"
#include <chrono>
#include <amtl/am-thread.h>
#include <stdlib.h>
#include <IThreader.h>
#include <bridge/include/ILogger.h>
#include <bridge/include/CoreProvider.h>
using namespace std::chrono_literals;
#define DBPARSE_LEVEL_NONE 0
#define DBPARSE_LEVEL_MAIN 1
#define DBPARSE_LEVEL_DATABASE 2
DBManager g_DBMan;
static bool s_OneTimeThreaderErrorMsg = false;
DBManager::DBManager()
: m_Terminate(false),
@ -377,13 +380,12 @@ void DBManager::KillWorkerThread()
if (m_Worker)
{
{
ke::AutoLock lock(&m_QueueEvent);
std::lock_guard<std::mutex> lock(m_Lock);
m_Terminate = true;
m_QueueEvent.Notify();
m_QueueEvent.notify_all();
}
m_Worker->Join();
m_Worker->join();
m_Worker = nullptr;
s_OneTimeThreaderErrorMsg = false;
m_Terminate = false;
}
}
@ -399,27 +401,17 @@ bool DBManager::AddToThreadQueue(IDBThreadOperation *op, PrioQueueLevel prio)
if (!m_Worker)
{
m_Worker = new ke::Thread([this]() -> void {
m_Worker = ke::NewThread("SM Database Worker", [this]() -> void {
Run();
}, "SM SQL Worker");
if (!m_Worker->Succeeded())
{
if (!s_OneTimeThreaderErrorMsg)
{
logger->LogError("[SM] Unable to create db threader (error unknown)");
s_OneTimeThreaderErrorMsg = true;
}
m_Worker = nullptr;
return false;
}
});
}
/* Add to the queue */
{
ke::AutoLock lock(&m_QueueEvent);
std::lock_guard<std::mutex> lock(m_Lock);
Queue<IDBThreadOperation *> &queue = m_OpQueue.GetQueue(prio);
queue.push(op);
m_QueueEvent.Notify();
m_QueueEvent.notify_one();
}
return true;
@ -450,7 +442,7 @@ void DBManager::Run()
void DBManager::ThreadMain()
{
ke::AutoLock lock(&m_QueueEvent);
std::unique_lock<std::mutex> lock(m_Lock);
while (true) {
// The lock has been acquired. Grab everything we can out of the
@ -458,46 +450,43 @@ void DBManager::ThreadMain()
// we process all operations we can before checking to terminate.
// There's no risk of starvation since the main thread blocks on us
// terminating.
while (true)
{
Queue<IDBThreadOperation *> &queue = m_OpQueue.GetLikelyQueue();
if (queue.empty())
break;
auto queue = &m_OpQueue.GetLikelyQueue();
if (queue->empty()) {
// If the queue is empty and we've been asked to stop, leave now.
if (m_Terminate)
return;
IDBThreadOperation *op = queue.first();
queue.pop();
// Unlock the queue when we run the query, so the main thread can
// keep pumping events. We re-acquire the lock to check for more
// items. It's okay if we terminate while unlocked; the main
// thread would be blocked and we'd need to flush the queue
// anyway, so after we've depleted the queue here, we'll just
// reach the terminate at the top of the loop.
{
ke::AutoUnlock unlock(&m_QueueEvent);
op->RunThreadPart();
ke::AutoLock lock(&m_ThinkLock);
m_ThinkQueue.push(op);
}
if (!m_Terminate)
{
ke::AutoUnlock unlock(&m_QueueEvent);
#ifdef _WIN32
Sleep(20);
#else
usleep(20000);
#endif
}
// Otherwise, wait for something to happen.
m_QueueEvent.wait(lock);
continue;
}
if (m_Terminate)
return;
IDBThreadOperation *op = queue->first();
queue->pop();
// Release the lock and wait for a signal.
m_QueueEvent.Wait();
// Unlock the queue when we run the query, so the main thread can
// keep pumping events. We re-acquire the lock to check for more
// items. It's okay if we terminate while unlocked; the main
// thread would be blocked and we'd need to flush the queue
// anyway, so after we've depleted the queue here, we'll just
// reach the terminate at the top of the loop.
lock.unlock();
op->RunThreadPart();
// Re-acquire the lock and give the data back to the main thread
// immediately. We use a separate lock to minimize game thread
// contention.
{
std::lock_guard<std::mutex> think_lock(m_ThinkLock);
m_ThinkQueue.push(op);
}
// Note that we add a 20ms delay after processing a query. This is
// questionable but the intent is to avoid starving the game thread.
if (!m_Terminate)
std::this_thread::sleep_for(20ms);
lock.lock();
}
}
@ -512,7 +501,7 @@ void DBManager::RunFrame()
/* Dump one thing per-frame so the server stays sane. */
IDBThreadOperation *op;
{
ke::AutoLock lock(&m_ThinkLock);
std::lock_guard<std::mutex> lock(m_ThinkLock);
op = m_ThinkQueue.first();
m_ThinkQueue.pop();
}

View File

@ -38,7 +38,10 @@
#include <sh_list.h>
#include <IThreader.h>
#include <IPluginSys.h>
#include <am-thread-utils.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include "sm_simple_prioqueue.h"
#include <am-refcounting.h>
#include "DatabaseConfBuilder.h"
@ -101,9 +104,10 @@ private:
PrioQueue<IDBThreadOperation *> m_OpQueue;
Queue<IDBThreadOperation *> m_ThinkQueue;
CVector<bool> m_drSafety; /* which drivers are safe? */
ke::AutoPtr<ke::Thread> m_Worker;
ke::ConditionVariable m_QueueEvent;
ke::Mutex m_ThinkLock;
std::unique_ptr<std::thread> m_Worker;
std::condition_variable m_QueueEvent;
std::mutex m_ThinkLock;
std::mutex m_Lock;
bool m_Terminate;
DatabaseConfBuilder m_Builder;

View File

@ -31,6 +31,7 @@
#include <sm_platform.h>
#include <amtl/am-deque.h>
#include <amtl/am-maybe.h>
#include <amtl/am-thread.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
@ -99,9 +100,10 @@ bool CompatWorker::Start()
if (state_ != Worker_Stopped)
return false;
thread_ = std::make_unique<std::thread>([this]() -> void {
thread_ = ke::NewThread("SM CompatWorker Thread", [this]() -> void {
Worker();
});
state_ = Worker_Running;
return true;
}
@ -353,7 +355,7 @@ bool CompatThread::Unpause()
if (thread_)
return false;
thread_ = std::make_unique<std::thread>([this]() -> void {
thread_ = ke::NewThread("SM CompatThread", [this]() -> void {
Run();
});

View File

@ -35,7 +35,6 @@
#include <mutex>
#include <IThreader.h>
#include <am-thread-utils.h>
#include <am-utility.h>
using namespace SourceMod;

View File

@ -297,7 +297,7 @@ void ClientPrefs::DatabaseConnect()
// Need a new scope because of the goto above.
{
AutoLock lock(&queryLock);
std::lock_guard<std::mutex> lock(queryLock);
this->ProcessQueryCache();
}
return;
@ -310,7 +310,7 @@ fatal_fail:
bool ClientPrefs::AddQueryToQueue(TQueryOp *query)
{
{
AutoLock lock(&queryLock);
std::lock_guard<std::mutex> lock(queryLock);
if (!Database)
{
cachedQueries.append(query);
@ -328,8 +328,6 @@ bool ClientPrefs::AddQueryToQueue(TQueryOp *query)
void ClientPrefs::ProcessQueryCache()
{
queryLock.AssertCurrentThreadOwns();
if (!Database)
return;
@ -373,7 +371,8 @@ void ClientPrefs::CatchLateLoadClients()
void ClientPrefs::ClearQueryCache(int serial)
{
AutoLock lock(&queryLock);
std::lock_guard<std::mutex> lock(queryLock);
for (size_t iter = 0; iter < cachedQueries.length(); ++iter)
{
TQueryOp *op = cachedQueries[iter];

View File

@ -37,8 +37,8 @@
#include "smsdk_ext.h"
#include "am-vector.h"
#include <am-thread-utils.h>
#include <am-refcounting.h>
#include <mutex>
char * UTIL_strncpy(char * destination, const char * source, size_t num);
@ -159,7 +159,7 @@ public:
private:
ke::Vector<TQueryOp *> cachedQueries;
ke::Mutex queryLock;
std::mutex queryLock;
IdentityToken_t *identity;
};

View File

@ -277,17 +277,13 @@ IPreparedQuery *MyDatabase::PrepareQuery(const char *query, char *error, size_t
bool MyDatabase::LockForFullAtomicOperation()
{
if (!m_FullLock)
m_FullLock = new ke::Mutex();
m_FullLock->Lock();
m_FullLock.lock();
return true;
}
void MyDatabase::UnlockFromFullAtomicOperation()
{
if (m_FullLock)
m_FullLock->Unlock();
m_FullLock.unlock();
}
IDBDriver *MyDatabase::GetDriver()

View File

@ -32,8 +32,8 @@
#ifndef _INCLUDE_SM_MYSQL_DATABASE_H_
#define _INCLUDE_SM_MYSQL_DATABASE_H_
#include <am-thread-utils.h>
#include <am-refcounting-threadsafe.h>
#include <mutex>
#include "MyDriver.h"
class MyQuery;
@ -70,7 +70,7 @@ public:
const DatabaseInfo &GetInfo();
private:
MYSQL *m_mysql;
ke::AutoPtr<ke::Mutex> m_FullLock;
std::mutex m_FullLock;
/* ---------- */
DatabaseInfo m_Info;

View File

@ -160,7 +160,7 @@ bool CompareField(const char *str1, const char *str2)
IDatabase *MyDriver::Connect(const DatabaseInfo *info, bool persistent, char *error, size_t maxlength)
{
ke::AutoLock lock(&m_Lock);
std::lock_guard<std::mutex> lock(m_Lock);
if (persistent)
{
@ -202,7 +202,7 @@ IDatabase *MyDriver::Connect(const DatabaseInfo *info, bool persistent, char *er
void MyDriver::RemoveFromList(MyDatabase *pdb, bool persistent)
{
ke::AutoLock lock(&m_Lock);
std::lock_guard<std::mutex> lock(m_Lock);
if (persistent)
{
m_PermDbs.remove(pdb);

View File

@ -45,7 +45,8 @@
#include <sh_string.h>
#include <sh_list.h>
#include <am-thread-utils.h>
#include <mutex>
using namespace SourceMod;
using namespace SourceHook;
@ -71,7 +72,7 @@ public:
void Shutdown();
void RemoveFromList(MyDatabase *pdb, bool persistent);
private:
ke::Mutex m_Lock;
std::mutex m_Lock;
Handle_t m_MyHandle;
List<MyDatabase *> m_TempDbs;
List<MyDatabase *> m_PermDbs;

View File

@ -64,17 +64,13 @@ const char *SqDatabase::GetError(int *errorCode/* =NULL */)
bool SqDatabase::LockForFullAtomicOperation()
{
if (!m_FullLock)
m_FullLock = new ke::Mutex();
m_FullLock->Lock();
m_FullLock.lock();
return true;
}
void SqDatabase::UnlockFromFullAtomicOperation()
{
if (m_FullLock)
m_FullLock->Unlock();
m_FullLock.unlock();
}
IDBDriver *SqDatabase::GetDriver()

View File

@ -33,7 +33,7 @@
#define _INCLUDE_SQLITE_SOURCEMOD_DATABASE_H_
#include <am-refcounting-threadsafe.h>
#include <am-thread-utils.h>
#include <mutex>
#include "SqDriver.h"
class SqDatabase
@ -70,7 +70,7 @@ public:
}
private:
sqlite3 *m_sq3;
ke::AutoPtr<ke::Mutex> m_FullLock;
std::mutex m_FullLock;
bool m_Persistent;
String m_LastError;
int m_LastErrorCode;

View File

@ -75,7 +75,7 @@ SqDriver::SqDriver()
// of g_SqDriver in SqDatabase's destructor.
SqDriver::~SqDriver()
{
ke::AutoLock lock(&m_OpenLock);
std::lock_guard<std::mutex> lock(m_OpenLock);
List<SqDbInfo>::iterator iter;
SqDatabase *sqdb;
@ -178,7 +178,7 @@ inline bool IsPathSepChar(char c)
IDatabase *SqDriver::Connect(const DatabaseInfo *info, bool persistent, char *error, size_t maxlength)
{
ke::AutoLock lock(&m_OpenLock);
std::lock_guard<std::mutex> lock(m_OpenLock);
/* Full path to the database file */
char fullpath[PLATFORM_MAX_PATH];
@ -296,7 +296,7 @@ IDatabase *SqDriver::Connect(const DatabaseInfo *info, bool persistent, char *er
void SqDriver::RemovePersistent(IDatabase *pdb)
{
ke::AutoLock lock(&m_OpenLock);
std::lock_guard<std::mutex> lock(m_OpenLock);
List<SqDbInfo>::iterator iter;
for (iter = m_Cache.begin(); iter != m_Cache.end(); iter++)

View File

@ -36,7 +36,7 @@
#include <IDBDriver.h>
#include <sh_list.h>
#include <sh_string.h>
#include <am-thread-utils.h>
#include <mutex>
#include "sqlite-source/sqlite3.h"
using namespace SourceMod;
@ -72,7 +72,7 @@ public:
void RemovePersistent(IDatabase *pdb);
private:
Handle_t m_Handle;
ke::Mutex m_OpenLock;
std::mutex m_OpenLock;
List<SqDbInfo> m_Cache;
bool m_bThreadSafe;
bool m_bShutdown;

@ -1 +1 @@
Subproject commit eb7b6ba084e13c50f7c2c53b285e6e6af44accd9
Subproject commit 83c4441b7ede45dfa447bdef614e924c4b7ffea9

@ -1 +1 @@
Subproject commit 305b59efbf97875da0ebfd46038fdfe519dbe077
Subproject commit 49f94603972956a85dab61da475e92795ead40a5