From a253e175bb9c3aebc4177b990b76cfd33e3931ca Mon Sep 17 00:00:00 2001 From: David Anderson Date: Thu, 14 May 2020 00:10:30 -0700 Subject: [PATCH] Replace all uses of AMTL threads with STL threads. This also rewrites the work loop for threaded queries. It has been simplified significantly. --- core/logic/Database.cpp | 103 +++++++++++------------- core/logic/Database.h | 12 ++- core/logic/ThreadSupport.cpp | 6 +- core/logic/ThreadSupport.h | 1 - extensions/clientprefs/extension.cpp | 9 +-- extensions/clientprefs/extension.h | 4 +- extensions/mysql/mysql/MyDatabase.cpp | 8 +- extensions/mysql/mysql/MyDatabase.h | 4 +- extensions/mysql/mysql/MyDriver.cpp | 4 +- extensions/mysql/mysql/MyDriver.h | 5 +- extensions/sqlite/driver/SqDatabase.cpp | 8 +- extensions/sqlite/driver/SqDatabase.h | 4 +- extensions/sqlite/driver/SqDriver.cpp | 6 +- extensions/sqlite/driver/SqDriver.h | 4 +- public/amtl | 2 +- sourcepawn | 2 +- 16 files changed, 84 insertions(+), 98 deletions(-) diff --git a/core/logic/Database.cpp b/core/logic/Database.cpp index c805765c..562ceca6 100644 --- a/core/logic/Database.cpp +++ b/core/logic/Database.cpp @@ -34,17 +34,20 @@ #include "HandleSys.h" #include "ExtensionSys.h" #include "PluginSys.h" +#include +#include #include #include #include #include +using namespace std::chrono_literals; + #define DBPARSE_LEVEL_NONE 0 #define DBPARSE_LEVEL_MAIN 1 #define DBPARSE_LEVEL_DATABASE 2 DBManager g_DBMan; -static bool s_OneTimeThreaderErrorMsg = false; DBManager::DBManager() : m_Terminate(false), @@ -377,13 +380,12 @@ void DBManager::KillWorkerThread() if (m_Worker) { { - ke::AutoLock lock(&m_QueueEvent); + std::lock_guard lock(m_Lock); m_Terminate = true; - m_QueueEvent.Notify(); + m_QueueEvent.notify_all(); } - m_Worker->Join(); + m_Worker->join(); m_Worker = nullptr; - s_OneTimeThreaderErrorMsg = false; m_Terminate = false; } } @@ -399,27 +401,17 @@ bool DBManager::AddToThreadQueue(IDBThreadOperation *op, PrioQueueLevel prio) if (!m_Worker) { - m_Worker = new ke::Thread([this]() -> void { + m_Worker = ke::NewThread("SM Database Worker", [this]() -> void { Run(); - }, "SM SQL Worker"); - if (!m_Worker->Succeeded()) - { - if (!s_OneTimeThreaderErrorMsg) - { - logger->LogError("[SM] Unable to create db threader (error unknown)"); - s_OneTimeThreaderErrorMsg = true; - } - m_Worker = nullptr; - return false; - } + }); } /* Add to the queue */ { - ke::AutoLock lock(&m_QueueEvent); + std::lock_guard lock(m_Lock); Queue &queue = m_OpQueue.GetQueue(prio); queue.push(op); - m_QueueEvent.Notify(); + m_QueueEvent.notify_one(); } return true; @@ -450,7 +442,7 @@ void DBManager::Run() void DBManager::ThreadMain() { - ke::AutoLock lock(&m_QueueEvent); + std::unique_lock lock(m_Lock); while (true) { // The lock has been acquired. Grab everything we can out of the @@ -458,46 +450,43 @@ void DBManager::ThreadMain() // we process all operations we can before checking to terminate. // There's no risk of starvation since the main thread blocks on us // terminating. - while (true) - { - Queue &queue = m_OpQueue.GetLikelyQueue(); - if (queue.empty()) - break; + auto queue = &m_OpQueue.GetLikelyQueue(); + if (queue->empty()) { + // If the queue is empty and we've been asked to stop, leave now. + if (m_Terminate) + return; - IDBThreadOperation *op = queue.first(); - queue.pop(); - - // Unlock the queue when we run the query, so the main thread can - // keep pumping events. We re-acquire the lock to check for more - // items. It's okay if we terminate while unlocked; the main - // thread would be blocked and we'd need to flush the queue - // anyway, so after we've depleted the queue here, we'll just - // reach the terminate at the top of the loop. - { - ke::AutoUnlock unlock(&m_QueueEvent); - op->RunThreadPart(); - - ke::AutoLock lock(&m_ThinkLock); - m_ThinkQueue.push(op); - } - - - if (!m_Terminate) - { - ke::AutoUnlock unlock(&m_QueueEvent); -#ifdef _WIN32 - Sleep(20); -#else - usleep(20000); -#endif - } + // Otherwise, wait for something to happen. + m_QueueEvent.wait(lock); + continue; } - if (m_Terminate) - return; + IDBThreadOperation *op = queue->first(); + queue->pop(); - // Release the lock and wait for a signal. - m_QueueEvent.Wait(); + // Unlock the queue when we run the query, so the main thread can + // keep pumping events. We re-acquire the lock to check for more + // items. It's okay if we terminate while unlocked; the main + // thread would be blocked and we'd need to flush the queue + // anyway, so after we've depleted the queue here, we'll just + // reach the terminate at the top of the loop. + lock.unlock(); + op->RunThreadPart(); + + // Re-acquire the lock and give the data back to the main thread + // immediately. We use a separate lock to minimize game thread + // contention. + { + std::lock_guard think_lock(m_ThinkLock); + m_ThinkQueue.push(op); + } + + // Note that we add a 20ms delay after processing a query. This is + // questionable but the intent is to avoid starving the game thread. + if (!m_Terminate) + std::this_thread::sleep_for(20ms); + + lock.lock(); } } @@ -512,7 +501,7 @@ void DBManager::RunFrame() /* Dump one thing per-frame so the server stays sane. */ IDBThreadOperation *op; { - ke::AutoLock lock(&m_ThinkLock); + std::lock_guard lock(m_ThinkLock); op = m_ThinkQueue.first(); m_ThinkQueue.pop(); } diff --git a/core/logic/Database.h b/core/logic/Database.h index c64706f2..f32342b3 100644 --- a/core/logic/Database.h +++ b/core/logic/Database.h @@ -38,7 +38,10 @@ #include #include #include -#include +#include +#include +#include +#include #include "sm_simple_prioqueue.h" #include #include "DatabaseConfBuilder.h" @@ -101,9 +104,10 @@ private: PrioQueue m_OpQueue; Queue m_ThinkQueue; CVector m_drSafety; /* which drivers are safe? */ - ke::AutoPtr m_Worker; - ke::ConditionVariable m_QueueEvent; - ke::Mutex m_ThinkLock; + std::unique_ptr m_Worker; + std::condition_variable m_QueueEvent; + std::mutex m_ThinkLock; + std::mutex m_Lock; bool m_Terminate; DatabaseConfBuilder m_Builder; diff --git a/core/logic/ThreadSupport.cpp b/core/logic/ThreadSupport.cpp index 1c4a35c5..29f2fd5b 100644 --- a/core/logic/ThreadSupport.cpp +++ b/core/logic/ThreadSupport.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -99,9 +100,10 @@ bool CompatWorker::Start() if (state_ != Worker_Stopped) return false; - thread_ = std::make_unique([this]() -> void { + thread_ = ke::NewThread("SM CompatWorker Thread", [this]() -> void { Worker(); }); + state_ = Worker_Running; return true; } @@ -353,7 +355,7 @@ bool CompatThread::Unpause() if (thread_) return false; - thread_ = std::make_unique([this]() -> void { + thread_ = ke::NewThread("SM CompatThread", [this]() -> void { Run(); }); diff --git a/core/logic/ThreadSupport.h b/core/logic/ThreadSupport.h index 4246f20f..5ddb7dc5 100644 --- a/core/logic/ThreadSupport.h +++ b/core/logic/ThreadSupport.h @@ -35,7 +35,6 @@ #include #include -#include #include using namespace SourceMod; diff --git a/extensions/clientprefs/extension.cpp b/extensions/clientprefs/extension.cpp index 62231803..e995f38a 100644 --- a/extensions/clientprefs/extension.cpp +++ b/extensions/clientprefs/extension.cpp @@ -297,7 +297,7 @@ void ClientPrefs::DatabaseConnect() // Need a new scope because of the goto above. { - AutoLock lock(&queryLock); + std::lock_guard lock(queryLock); this->ProcessQueryCache(); } return; @@ -310,7 +310,7 @@ fatal_fail: bool ClientPrefs::AddQueryToQueue(TQueryOp *query) { { - AutoLock lock(&queryLock); + std::lock_guard lock(queryLock); if (!Database) { cachedQueries.append(query); @@ -328,8 +328,6 @@ bool ClientPrefs::AddQueryToQueue(TQueryOp *query) void ClientPrefs::ProcessQueryCache() { - queryLock.AssertCurrentThreadOwns(); - if (!Database) return; @@ -373,7 +371,8 @@ void ClientPrefs::CatchLateLoadClients() void ClientPrefs::ClearQueryCache(int serial) { - AutoLock lock(&queryLock); + std::lock_guard lock(queryLock); + for (size_t iter = 0; iter < cachedQueries.length(); ++iter) { TQueryOp *op = cachedQueries[iter]; diff --git a/extensions/clientprefs/extension.h b/extensions/clientprefs/extension.h index 1ca81950..dde6d290 100644 --- a/extensions/clientprefs/extension.h +++ b/extensions/clientprefs/extension.h @@ -37,8 +37,8 @@ #include "smsdk_ext.h" #include "am-vector.h" -#include #include +#include char * UTIL_strncpy(char * destination, const char * source, size_t num); @@ -159,7 +159,7 @@ public: private: ke::Vector cachedQueries; - ke::Mutex queryLock; + std::mutex queryLock; IdentityToken_t *identity; }; diff --git a/extensions/mysql/mysql/MyDatabase.cpp b/extensions/mysql/mysql/MyDatabase.cpp index e5cddf65..f5803363 100644 --- a/extensions/mysql/mysql/MyDatabase.cpp +++ b/extensions/mysql/mysql/MyDatabase.cpp @@ -277,17 +277,13 @@ IPreparedQuery *MyDatabase::PrepareQuery(const char *query, char *error, size_t bool MyDatabase::LockForFullAtomicOperation() { - if (!m_FullLock) - m_FullLock = new ke::Mutex(); - - m_FullLock->Lock(); + m_FullLock.lock(); return true; } void MyDatabase::UnlockFromFullAtomicOperation() { - if (m_FullLock) - m_FullLock->Unlock(); + m_FullLock.unlock(); } IDBDriver *MyDatabase::GetDriver() diff --git a/extensions/mysql/mysql/MyDatabase.h b/extensions/mysql/mysql/MyDatabase.h index 27cae4f2..c91d7ad8 100644 --- a/extensions/mysql/mysql/MyDatabase.h +++ b/extensions/mysql/mysql/MyDatabase.h @@ -32,8 +32,8 @@ #ifndef _INCLUDE_SM_MYSQL_DATABASE_H_ #define _INCLUDE_SM_MYSQL_DATABASE_H_ -#include #include +#include #include "MyDriver.h" class MyQuery; @@ -70,7 +70,7 @@ public: const DatabaseInfo &GetInfo(); private: MYSQL *m_mysql; - ke::AutoPtr m_FullLock; + std::mutex m_FullLock; /* ---------- */ DatabaseInfo m_Info; diff --git a/extensions/mysql/mysql/MyDriver.cpp b/extensions/mysql/mysql/MyDriver.cpp index fb5cb17e..1555ebeb 100644 --- a/extensions/mysql/mysql/MyDriver.cpp +++ b/extensions/mysql/mysql/MyDriver.cpp @@ -160,7 +160,7 @@ bool CompareField(const char *str1, const char *str2) IDatabase *MyDriver::Connect(const DatabaseInfo *info, bool persistent, char *error, size_t maxlength) { - ke::AutoLock lock(&m_Lock); + std::lock_guard lock(m_Lock); if (persistent) { @@ -202,7 +202,7 @@ IDatabase *MyDriver::Connect(const DatabaseInfo *info, bool persistent, char *er void MyDriver::RemoveFromList(MyDatabase *pdb, bool persistent) { - ke::AutoLock lock(&m_Lock); + std::lock_guard lock(m_Lock); if (persistent) { m_PermDbs.remove(pdb); diff --git a/extensions/mysql/mysql/MyDriver.h b/extensions/mysql/mysql/MyDriver.h index 64dd45e5..3d2d9351 100644 --- a/extensions/mysql/mysql/MyDriver.h +++ b/extensions/mysql/mysql/MyDriver.h @@ -45,7 +45,8 @@ #include #include -#include + +#include using namespace SourceMod; using namespace SourceHook; @@ -71,7 +72,7 @@ public: void Shutdown(); void RemoveFromList(MyDatabase *pdb, bool persistent); private: - ke::Mutex m_Lock; + std::mutex m_Lock; Handle_t m_MyHandle; List m_TempDbs; List m_PermDbs; diff --git a/extensions/sqlite/driver/SqDatabase.cpp b/extensions/sqlite/driver/SqDatabase.cpp index a4f03a89..52df8b39 100644 --- a/extensions/sqlite/driver/SqDatabase.cpp +++ b/extensions/sqlite/driver/SqDatabase.cpp @@ -64,17 +64,13 @@ const char *SqDatabase::GetError(int *errorCode/* =NULL */) bool SqDatabase::LockForFullAtomicOperation() { - if (!m_FullLock) - m_FullLock = new ke::Mutex(); - - m_FullLock->Lock(); + m_FullLock.lock(); return true; } void SqDatabase::UnlockFromFullAtomicOperation() { - if (m_FullLock) - m_FullLock->Unlock(); + m_FullLock.unlock(); } IDBDriver *SqDatabase::GetDriver() diff --git a/extensions/sqlite/driver/SqDatabase.h b/extensions/sqlite/driver/SqDatabase.h index 900fa52b..815eab79 100644 --- a/extensions/sqlite/driver/SqDatabase.h +++ b/extensions/sqlite/driver/SqDatabase.h @@ -33,7 +33,7 @@ #define _INCLUDE_SQLITE_SOURCEMOD_DATABASE_H_ #include -#include +#include #include "SqDriver.h" class SqDatabase @@ -70,7 +70,7 @@ public: } private: sqlite3 *m_sq3; - ke::AutoPtr m_FullLock; + std::mutex m_FullLock; bool m_Persistent; String m_LastError; int m_LastErrorCode; diff --git a/extensions/sqlite/driver/SqDriver.cpp b/extensions/sqlite/driver/SqDriver.cpp index 5c0710f5..3b4ffbaf 100644 --- a/extensions/sqlite/driver/SqDriver.cpp +++ b/extensions/sqlite/driver/SqDriver.cpp @@ -75,7 +75,7 @@ SqDriver::SqDriver() // of g_SqDriver in SqDatabase's destructor. SqDriver::~SqDriver() { - ke::AutoLock lock(&m_OpenLock); + std::lock_guard lock(m_OpenLock); List::iterator iter; SqDatabase *sqdb; @@ -178,7 +178,7 @@ inline bool IsPathSepChar(char c) IDatabase *SqDriver::Connect(const DatabaseInfo *info, bool persistent, char *error, size_t maxlength) { - ke::AutoLock lock(&m_OpenLock); + std::lock_guard lock(m_OpenLock); /* Full path to the database file */ char fullpath[PLATFORM_MAX_PATH]; @@ -296,7 +296,7 @@ IDatabase *SqDriver::Connect(const DatabaseInfo *info, bool persistent, char *er void SqDriver::RemovePersistent(IDatabase *pdb) { - ke::AutoLock lock(&m_OpenLock); + std::lock_guard lock(m_OpenLock); List::iterator iter; for (iter = m_Cache.begin(); iter != m_Cache.end(); iter++) diff --git a/extensions/sqlite/driver/SqDriver.h b/extensions/sqlite/driver/SqDriver.h index ee35235f..e41c9e70 100644 --- a/extensions/sqlite/driver/SqDriver.h +++ b/extensions/sqlite/driver/SqDriver.h @@ -36,7 +36,7 @@ #include #include #include -#include +#include #include "sqlite-source/sqlite3.h" using namespace SourceMod; @@ -72,7 +72,7 @@ public: void RemovePersistent(IDatabase *pdb); private: Handle_t m_Handle; - ke::Mutex m_OpenLock; + std::mutex m_OpenLock; List m_Cache; bool m_bThreadSafe; bool m_bShutdown; diff --git a/public/amtl b/public/amtl index eb7b6ba0..83c4441b 160000 --- a/public/amtl +++ b/public/amtl @@ -1 +1 @@ -Subproject commit eb7b6ba084e13c50f7c2c53b285e6e6af44accd9 +Subproject commit 83c4441b7ede45dfa447bdef614e924c4b7ffea9 diff --git a/sourcepawn b/sourcepawn index 305b59ef..49f94603 160000 --- a/sourcepawn +++ b/sourcepawn @@ -1 +1 @@ -Subproject commit 305b59efbf97875da0ebfd46038fdfe519dbe077 +Subproject commit 49f94603972956a85dab61da475e92795ead40a5