From cbb25464531f37504324e7f4c28395258133f3ec Mon Sep 17 00:00:00 2001 From: David Anderson Date: Fri, 13 Jul 2007 19:20:12 +0000 Subject: [PATCH] initial import of threaded sql functionality --HG-- extra : convert_revision : svn%3A39bc706e-5318-0410-9160-8a85361fbb7c/trunk%401102 --- core/Database.cpp | 238 +++++++++++++++++++++++ core/Database.h | 47 ++++- core/smn_database.cpp | 363 ++++++++++++++++++++++++++++++++++- core/sourcemod.cpp | 3 + plugins/include/dbi.inc | 110 ++++++++++- plugins/testsuite/sqltest.sp | 70 +++++++ 6 files changed, 828 insertions(+), 3 deletions(-) diff --git a/core/Database.cpp b/core/Database.cpp index 667c9d7b..ab80aff2 100644 --- a/core/Database.cpp +++ b/core/Database.cpp @@ -21,12 +21,14 @@ #include "Logger.h" #include "ExtensionSys.h" #include +#include "ThreadSupport.h" #define DBPARSE_LEVEL_NONE 0 #define DBPARSE_LEVEL_MAIN 1 #define DBPARSE_LEVEL_DATABASE 2 DBManager g_DBMan; +static bool s_OneTimeThreaderErrorMsg = false; DBManager::DBManager() : m_StrTab(512), m_ParseLevel(0), m_ParseState(0), m_pDefault(NULL) @@ -47,12 +49,24 @@ void DBManager::OnSourceModAllInitialized() g_ShareSys.AddInterface(NULL, this); g_SourceMod.BuildPath(Path_SM, m_Filename, sizeof(m_Filename), "configs/databases.cfg"); + + m_pConfigLock = g_pThreader->MakeMutex(); + m_pThinkLock = g_pThreader->MakeMutex(); + m_pQueueLock = g_pThreader->MakeMutex(); + + g_PluginSys.AddPluginsListener(this); } void DBManager::OnSourceModLevelChange(const char *mapName) { SMCParseError err; unsigned int line = 0; + + /* We lock and don't give up the lock until we're done. + * This way the thread's search won't be searching through a + * potentially empty/corrupt list, which would be very bad. + */ + m_pConfigLock->Lock(); if ((err = g_TextParser.ParseFile_SMC(m_Filename, this, &line, NULL)) != SMCParse_Okay) { g_Logger.LogError("[SM] Detected parse error(s) in file \"%s\"", m_Filename); @@ -62,10 +76,17 @@ void DBManager::OnSourceModLevelChange(const char *mapName) g_Logger.LogError("[SM] Line %d: %s", line, txt); } } + m_pConfigLock->Unlock(); + + g_PluginSys.RemovePluginsListener(this); } void DBManager::OnSourceModShutdown() { + KillWorkerThread(); + m_pConfigLock->DestroyThis(); + m_pThinkLock->DestroyThis(); + m_pQueueLock->DestroyThis(); g_HandleSys.RemoveType(m_DatabaseType, g_pCoreIdent); g_HandleSys.RemoveType(m_DriverType, g_pCoreIdent); } @@ -269,11 +290,24 @@ bool DBManager::Connect(const char *name, IDBDriver **pdr, IDatabase **pdb, bool void DBManager::AddDriver(IDBDriver *pDriver) { + /* Let's kill the worker. Join the thread and let the queries flush. + * This is kind of stupid but we just want to unload safely. + * Rather than recreate the worker, we'll wait until someone throws + * another query through. + */ + KillWorkerThread(); + m_drivers.push_back(pDriver); } void DBManager::RemoveDriver(IDBDriver *pDriver) { + /* Again, we're forced to kill the worker. How rude! + * Doing this flushes the queue, and thus we don't need to + * clean anything else. + */ + KillWorkerThread(); + for (size_t i=0; i::iterator qiter = m_ThinkQueue.begin(); + Queue templist; + while (qiter != m_ThinkQueue.end()) + { + IDBThreadOperation *op = (*qiter); + if (op->GetDriver() == pDriver) + { + templist.push(op); + qiter = m_ThinkQueue.erase(qiter); + } else { + qiter++; + } + } + + for (qiter = templist.begin(); + qiter != templist.end(); + qiter++) + { + IDBThreadOperation *op = (*qiter); + op->CancelThinkPart(); + } } IDBDriver *DBManager::GetDefaultDriver() @@ -417,3 +476,182 @@ IDBDriver *DBManager::FindOrLoadDriver(const char *name) return NULL; } + +void DBManager::KillWorkerThread() +{ + if (m_pWorker) + { + m_pWorker->Stop(false); + g_pThreader->DestroyWorker(m_pWorker); + m_pWorker = NULL; + s_OneTimeThreaderErrorMsg = false; + } +} + +bool DBManager::AddToThreadQueue(IDBThreadOperation *op, PrioQueueLevel prio) +{ + if (!m_pWorker) + { + m_pWorker = g_pThreader->MakeWorker(this, true); + if (!m_pWorker) + { + if (!s_OneTimeThreaderErrorMsg) + { + g_Logger.LogError("[SM] Unable to create db threader (error unknown)"); + s_OneTimeThreaderErrorMsg = true; + } + return false; + } + if (!m_pWorker->Start()) + { + if (!s_OneTimeThreaderErrorMsg) + { + g_Logger.LogError("[SM] Unable to start db threader (error unknown)"); + s_OneTimeThreaderErrorMsg = true; + } + g_pThreader->DestroyWorker(m_pWorker); + m_pWorker = NULL; + return false; + } + } + + /* Add to the queue */ + { + m_pQueueLock->Lock(); + Queue &queue = m_OpQueue.GetQueue(prio); + queue.push(op); + m_pQueueLock->Unlock(); + } + + /* Make the thread */ + m_pWorker->MakeThread(this); + + return true; +} + +void DBManager::OnWorkerStart(IThreadWorker *pWorker) +{ + m_drSafety.clear(); + for (size_t i=0; iIsThreadSafe()) + { + m_drSafety.push_back(m_drivers[i]->InitializeThreadSafety()); + } else { + m_drSafety.push_back(false); + } + } +} + +void DBManager::OnWorkerStop(IThreadWorker *pWorker) +{ + for (size_t i=0; iShutdownThreadSafety(); + } + } + m_drSafety.clear(); +} + +void DBManager::RunThread(IThreadHandle *pThread) +{ + IDBThreadOperation *op = NULL; + + /* Get something from the queue */ + { + m_pQueueLock->Lock(); + Queue &queue = m_OpQueue.GetLikelyQueue(); + if (!queue.empty()) + { + op = queue.first(); + queue.pop(); + } + m_pQueueLock->Unlock(); + } + + /* whoa. hi. did we get something? we should have. */ + if (!op) + { + /* wtf? */ + return; + } + + op->RunThreadPart(); + + m_pThinkLock->Lock(); + m_ThinkQueue.push(op); + m_pThinkLock->Unlock(); +} + +void DBManager::RunFrame() +{ + /* Don't bother if we're empty */ + if (!m_ThinkQueue.size()) + { + return; + } + + /* Dump one thing per-frame so the server stays sane. */ + m_pThinkLock->Lock(); + IDBThreadOperation *op = m_ThinkQueue.first(); + m_ThinkQueue.pop(); + m_pThinkLock->Unlock(); + op->RunThinkPart(); +} + +void DBManager::OnTerminate(IThreadHandle *pThread, bool cancel) +{ + /* Do nothing */ +} + +void DBManager::OnPluginUnloaded(IPlugin *plugin) +{ + /* Kill the thread so we can flush everything into the think queue... */ + KillWorkerThread(); + + /* Mark the plugin as being unloaded so future database calls will ignore threading... */ + plugin->SetProperty("DisallowDBThreads", NULL); + + /* Run all of the think operations. + * Unlike the driver unloading example, we'll let these calls go through, + * since a plugin unloading is far more normal. + */ + Queue::iterator iter = m_ThinkQueue.begin(); + Queue templist; + while (iter != m_ThinkQueue.end()) + { + IDBThreadOperation *op = (*iter); + if (op->GetPlugin() == plugin) + { + templist.push(op); + iter = m_ThinkQueue.erase(iter); + } else { + iter++; + } + } + + for (iter = templist.begin(); + iter != templist.end(); + iter++) + { + IDBThreadOperation *op = (*iter); + op->RunThinkPart(); + } +} + +void DBManager::LockConfig() +{ + m_pConfigLock->Lock(); +} + +void DBManager::UnlockConfig() +{ + m_pConfigLock->Unlock(); +} + +const char *DBManager::GetDefaultDriverName() +{ + return m_DefDriver.c_str(); +} diff --git a/core/Database.h b/core/Database.h index fa14c553..66647e19 100644 --- a/core/Database.h +++ b/core/Database.h @@ -22,6 +22,9 @@ #include #include #include "sm_memtable.h" +#include +#include "sm_simple_prioqueue.h" +#include "PluginSys.h" using namespace SourceHook; @@ -41,11 +44,24 @@ struct ConfDbInfo DatabaseInfo info; }; +class IDBThreadOperation +{ +public: + virtual IDBDriver *GetDriver() =0; + virtual CPlugin *GetPlugin() =0; + virtual void RunThreadPart() =0; + virtual void RunThinkPart() =0; + virtual void CancelThinkPart() =0; +}; + class DBManager : public IDBManager, public SMGlobalClass, public IHandleTypeDispatch, - public ITextListener_SMC + public ITextListener_SMC, + public IThread, + public IThreadWorkerCallbacks, + public IPluginsListener { public: DBManager(); @@ -74,12 +90,41 @@ public: //ITextListener_SMC SMCParseResult ReadSMC_KeyValue(const char *key, const char *value, bool key_quotes, bool value_quotes); SMCParseResult ReadSMC_LeavingSection(); void ReadSMC_ParseEnd(bool halted, bool failed); +public: //IThread + void RunThread(IThreadHandle *pThread); + void OnTerminate(IThreadHandle *pThread, bool cancel); +public: //IThreadWorkerCallbacks + void OnWorkerStart(IThreadWorker *pWorker); + void OnWorkerStop(IThreadWorker *pWorker); +public: //IPluginsListener + void OnPluginUnloaded(IPlugin *plugin); public: ConfDbInfo *GetDatabaseConf(const char *name); IDBDriver *FindOrLoadDriver(const char *name); IDBDriver *GetDefaultDriver(); + const char *GetDefaultDriverName(); + bool AddToThreadQueue(IDBThreadOperation *op, PrioQueueLevel prio); + void LockConfig(); + void UnlockConfig(); + void RunFrame(); + inline HandleType_t GetDatabaseType() + { + return m_DatabaseType; + } +private: + void KillWorkerThread(); private: CVector m_drivers; + + /* Threading stuff */ + PrioQueue m_OpQueue; + Queue m_ThinkQueue; + CVector m_drSafety; /* which drivers are safe? */ + IThreadWorker *m_pWorker; /* Worker thread object */ + IMutex *m_pConfigLock; /* Configuration lock */ + IMutex *m_pQueueLock; /* Queue safety lock */ + IMutex *m_pThinkLock; /* Think-queue lock */ + List m_confs; HandleType_t m_DriverType; HandleType_t m_DatabaseType; diff --git a/core/smn_database.cpp b/core/smn_database.cpp index 290e0a2a..b10025ef 100644 --- a/core/smn_database.cpp +++ b/core/smn_database.cpp @@ -3,6 +3,7 @@ #include "Database.h" #include "ExtensionSys.h" #include "PluginSys.h" +#include "sm_stringutil.h" HandleType_t hQueryType; HandleType_t hStmtType; @@ -78,6 +79,197 @@ inline HandleError ReadDbOrStmtHndl(Handle_t hndl, IPluginContext *pContext, IDa return err; } +class TQueryOp : public IDBThreadOperation +{ +public: + TQueryOp(IDatabase *db, IPluginFunction *pf, const char *query, cell_t data) : + m_pDatabase(db), m_pFunction(pf), m_Query(query), m_Data(data), + me(g_PluginSys.GetPluginByCtx(pf->GetParentContext()->GetContext())), + m_pQuery(NULL) + { + /* We always increase the reference count because this is potentially + * asynchronous. Otherwise the original handle could be closed while + * we're still latched onto it. + */ + m_pDatabase->IncReferenceCount(); + + /* Now create our own Handle such that it can only be closed by us. + * We allow cloning just in case someone wants to hold onto it. + */ + HandleSecurity sec(me->GetIdentity(), g_pCoreIdent); + HandleAccess access; + g_HandleSys.InitAccessDefaults(NULL, &access); + access.access[HandleAccess_Delete] = HANDLE_RESTRICT_IDENTITY|HANDLE_RESTRICT_OWNER; + m_MyHandle = g_HandleSys.CreateHandleEx(g_DBMan.GetDatabaseType(), + db, + &sec, + &access, + NULL); + } + ~TQueryOp() + { + if (m_pQuery) + { + m_pQuery->Destroy(); + } + + /* Close our Handle if it's valid. */ + if (m_MyHandle != BAD_HANDLE) + { + HandleSecurity sec(me->GetIdentity(), g_pCoreIdent); + g_HandleSys.FreeHandle(m_MyHandle, &sec); + } else { + /* Otherwise, there is an open ref to the db */ + m_pDatabase->Close(); + } + } + CPlugin *GetPlugin() + { + return me; + } + IDBDriver *GetDriver() + { + return m_pDatabase->GetDriver(); + } + void RunThreadPart() + { + m_pDatabase->LockForFullAtomicOperation(); + m_pQuery = m_pDatabase->DoQuery(m_Query.c_str()); + if (!m_pQuery) + { + UTIL_Format(error, sizeof(error), "%s", m_pDatabase->GetError()); + } + m_pDatabase->UnlockFromFullAtomicOperation(); + } + void CancelThinkPart() + { + m_pFunction->PushCell(BAD_HANDLE); + m_pFunction->PushCell(BAD_HANDLE); + m_pFunction->PushString("Driver is unloading"); + m_pFunction->PushCell(m_Data); + m_pFunction->Execute(NULL); + delete this; + } + void RunThinkPart() + { + /* Create a Handle for our query */ + HandleSecurity sec(me->GetIdentity(), g_pCoreIdent); + HandleAccess access; + g_HandleSys.InitAccessDefaults(NULL, &access); + access.access[HandleAccess_Delete] = HANDLE_RESTRICT_IDENTITY|HANDLE_RESTRICT_OWNER; + + Handle_t qh = BAD_HANDLE; + + if (m_pQuery) + { + qh = g_HandleSys.CreateHandle(hQueryType, m_pQuery, me->GetIdentity(), g_pCoreIdent, NULL); + if (qh != BAD_HANDLE) + { + m_pQuery = NULL; + } else { + UTIL_Format(error, sizeof(error), "Could not alloc handle"); + } + } + + m_pFunction->PushCell(m_MyHandle); + m_pFunction->PushCell(qh); + m_pFunction->PushString(qh == BAD_HANDLE ? error : ""); + m_pFunction->PushCell(m_Data); + m_pFunction->Execute(NULL); + + if (qh != BAD_HANDLE) + { + g_HandleSys.FreeHandle(qh, &sec); + } + + delete this; + } +private: + IDatabase *m_pDatabase; + IPluginFunction *m_pFunction; + String m_Query; + cell_t m_Data; + CPlugin *me; + IQuery *m_pQuery; + char error[255]; + Handle_t m_MyHandle; +}; + +class TConnectOp : public IDBThreadOperation +{ +public: + TConnectOp(IPluginFunction *func, IDBDriver *driver, const char *_dbname) + { + m_pFunction = func; + m_pDriver = driver; + m_pDatabase = NULL; + error[0] = '\0'; + strncopy(dbname, _dbname, sizeof(dbname)); + me = g_PluginSys.GetPluginByCtx(m_pFunction->GetParentContext()->GetContext()); + } + CPlugin *GetPlugin() + { + return me; + } + IDBDriver *GetDriver() + { + return m_pDriver; + } + void RunThreadPart() + { + g_DBMan.LockConfig(); + const DatabaseInfo *pInfo = g_DBMan.FindDatabaseConf(dbname); + if (!pInfo) + { + UTIL_Format(error, sizeof(error), "Could not find database config \"%s\"", dbname); + } else { + m_pDatabase = m_pDriver->Connect(pInfo, false, error, sizeof(error)); + } + g_DBMan.UnlockConfig(); + } + void CancelThinkPart() + { + if (m_pDatabase) + { + m_pDatabase->Close(); + } + m_pFunction->PushCell(BAD_HANDLE); + m_pFunction->PushCell(BAD_HANDLE); + m_pFunction->PushString("Driver is unloading"); + m_pFunction->PushCell(0); + m_pFunction->Execute(NULL); + delete this; + } + void RunThinkPart() + { + Handle_t hndl = BAD_HANDLE; + + if (m_pDatabase) + { + if ((hndl = g_DBMan.CreateHandle(DBHandle_Database, m_pDatabase, me->GetIdentity())) + == BAD_HANDLE) + { + m_pDatabase->Close(); + UTIL_Format(error, sizeof(error), "Unable to allocate Handle"); + } + } + + m_pFunction->PushCell(m_pDriver->GetHandle()); + m_pFunction->PushCell(hndl); + m_pFunction->PushString(hndl == BAD_HANDLE ? "" : error); + m_pFunction->PushCell(0); + m_pFunction->Execute(NULL); + delete this; + } +private: + CPlugin *me; + IPluginFunction *m_pFunction; + IDBDriver *m_pDriver; + IDatabase *m_pDatabase; + char dbname[64]; + char error[255]; +}; + static cell_t SQL_Connect(IPluginContext *pContext, const cell_t *params) { char *conf, *err; @@ -111,6 +303,75 @@ static cell_t SQL_Connect(IPluginContext *pContext, const cell_t *params) return hndl; } +static cell_t SQL_TConnect(IPluginContext *pContext, const cell_t *params) +{ + IPluginFunction *pf = pContext->GetFunctionById(params[1]); + if (!pf) + { + return pContext->ThrowNativeError("Function id %x is invalid", params[1]); + } + + char *conf; + pContext->LocalToString(params[2], &conf); + + IDBDriver *driver = NULL; + const DatabaseInfo *pInfo = g_DBMan.FindDatabaseConf(conf); + char error[255]; + if (pInfo != NULL) + { + if (pInfo->driver[0] == '\0') + { + driver = g_DBMan.GetDefaultDriver(); + } else { + driver = g_DBMan.FindOrLoadDriver(pInfo->driver); + } + if (!driver) + { + UTIL_Format(error, + sizeof(error), + "Could not find driver \"%s\"", + pInfo->driver[0] == '\0' ? g_DBMan.GetDefaultDriverName() : pInfo->driver); + } else if (!driver->IsThreadSafe()) { + UTIL_Format(error, + sizeof(error), + "Driver \"%s\" is not thread safe!", + driver->GetIdentifier()); + } + } else { + UTIL_Format(error, sizeof(error), "Could not find database conf \"%s\"", conf); + } + + if (!pInfo || !driver) + { + pf->PushCell(BAD_HANDLE); + pf->PushCell(BAD_HANDLE); + pf->PushString(error); + pf->PushCell(0); + pf->Execute(NULL); + return 0; + } + + /* HACK! Add us to the dependency list */ + CExtension *pExt = g_Extensions.GetExtensionFromIdent(driver->GetIdentity()); + if (pExt) + { + g_Extensions.BindChildPlugin(pExt, g_PluginSys.FindPluginByContext(pContext->GetContext())); + } + + /* Finally, add to the thread if we can */ + TConnectOp *op = new TConnectOp(pf, driver, conf); + CPlugin *pPlugin = g_PluginSys.GetPluginByCtx(pContext->GetContext()); + if (pPlugin->GetProperty("DisallowDBThreads", NULL) + || !g_DBMan.AddToThreadQueue(op, PrioQueue_High)) + { + /* Do everything right now */ + op->RunThreadPart(); + op->RunThinkPart(); + } + + return 1; +} + static cell_t SQL_ConnectEx(IPluginContext *pContext, const cell_t *params) { IDBDriver *driver; @@ -380,6 +641,81 @@ static cell_t SQL_Query(IPluginContext *pContext, const cell_t *params) return hndl; } +static cell_t SQL_TQuery(IPluginContext *pContext, const cell_t *params) +{ + IDatabase *db = NULL; + HandleError err; + + if ((err = g_DBMan.ReadHandle(params[1], DBHandle_Database, (void **)&db)) + != HandleError_None) + { + return pContext->ThrowNativeError("Invalid database Handle %x (error: %d)", params[1], err); + } + + IPluginFunction *pf = pContext->GetFunctionById(params[2]); + if (!pf) + { + return pContext->ThrowNativeError("Function id %x is invalid", params[2]); + } + + char *query; + pContext->LocalToString(params[3], &query); + + cell_t data = params[4]; + PrioQueueLevel level = PrioQueue_Normal; + if (params[5] == (cell_t)PrioQueue_High) + { + level = PrioQueue_High; + } else if (params[5] == (cell_t)PrioQueue_Low) { + level = PrioQueue_Low; + } + + CPlugin *pPlugin = g_PluginSys.GetPluginByCtx(pContext->GetContext()); + + TQueryOp *op = new TQueryOp(db, pf, query, data); + if (pPlugin->GetProperty("DisallowDBThreads", NULL) + || !g_DBMan.AddToThreadQueue(op, level)) + { + /* Do everything right now */ + op->RunThreadPart(); + op->RunThinkPart(); + } + + return 1; +} + +static cell_t SQL_LockDatabase(IPluginContext *pContext, const cell_t *params) +{ + IDatabase *db = NULL; + HandleError err; + + if ((err = g_DBMan.ReadHandle(params[1], DBHandle_Database, (void **)&db)) + != HandleError_None) + { + return pContext->ThrowNativeError("Invalid database Handle %x (error: %d)", params[1], err); + } + + db->LockForFullAtomicOperation(); + + return 1; +} + +static cell_t SQL_UnlockDatabase(IPluginContext *pContext, const cell_t *params) +{ + IDatabase *db = NULL; + HandleError err; + + if ((err = g_DBMan.ReadHandle(params[1], DBHandle_Database, (void **)&db)) + != HandleError_None) + { + return pContext->ThrowNativeError("Invalid database Handle %x (error: %d)", params[1], err); + } + + db->UnlockFromFullAtomicOperation(); + + return 1; +} + static cell_t SQL_PrepareQuery(IPluginContext *pContext, const cell_t *params) { IDatabase *db = NULL; @@ -838,6 +1174,26 @@ static cell_t SQL_Execute(IPluginContext *pContext, const cell_t *params) return stmt->Execute() ? 1 : 0; } +static cell_t SQL_IsSameConnection(IPluginContext *pContext, const cell_t *params) +{ + IDatabase *db1=NULL, *db2=NULL; + HandleError err; + + if ((err = g_DBMan.ReadHandle(params[1], DBHandle_Database, (void **)&db1)) + != HandleError_None) + { + return pContext->ThrowNativeError("Invalid database Handle 1/%x (error: %d)", params[1], err); + } + + if ((err = g_DBMan.ReadHandle(params[2], DBHandle_Database, (void **)&db2)) + != HandleError_None) + { + return pContext->ThrowNativeError("Invalid database Handle 2/%x (error: %d)", params[2], err); + } + + return (db1 == db2) ? true : false; +} + REGISTER_NATIVES(dbNatives) { {"SQL_BindParamInt", SQL_BindParamInt}, @@ -865,10 +1221,15 @@ REGISTER_NATIVES(dbNatives) {"SQL_GetRowCount", SQL_GetRowCount}, {"SQL_HasResultSet", SQL_HasResultSet}, {"SQL_IsFieldNull", SQL_IsFieldNull}, + {"SQL_IsSameConnection", SQL_IsSameConnection}, + {"SQL_LockDatabase", SQL_LockDatabase}, {"SQL_MoreRows", SQL_MoreRows}, + {"SQL_PrepareQuery", SQL_PrepareQuery}, {"SQL_Query", SQL_Query}, {"SQL_QuoteString", SQL_QuoteString}, - {"SQL_PrepareQuery", SQL_PrepareQuery}, {"SQL_Rewind", SQL_Rewind}, + {"SQL_TConnect", SQL_TConnect}, + {"SQL_TQuery", SQL_TQuery}, + {"SQL_UnlockDatabase", SQL_UnlockDatabase}, {NULL, NULL}, }; diff --git a/core/sourcemod.cpp b/core/sourcemod.cpp index b01eee7f..cd0f54dc 100644 --- a/core/sourcemod.cpp +++ b/core/sourcemod.cpp @@ -31,6 +31,7 @@ #include "TimerSys.h" #include "MenuStyle_Valve.h" #include "MenuStyle_Radio.h" +#include "Database.h" SH_DECL_HOOK6(IServerGameDLL, LevelInit, SH_NOATTRIB, false, bool, const char *, const char *, const char *, const char *, bool, bool); SH_DECL_HOOK0_void(IServerGameDLL, LevelShutdown, SH_NOATTRIB, false); @@ -337,6 +338,8 @@ void SimulateTick() void SourceModBase::GameFrame(bool simulating) { + g_DBMan.RunFrame(); + /** * Note: This is all hardcoded rather than delegated to save * precious CPU cycles. diff --git a/plugins/include/dbi.inc b/plugins/include/dbi.inc index ea420fe7..8b8610a9 100644 --- a/plugins/include/dbi.inc +++ b/plugins/include/dbi.inc @@ -56,7 +56,27 @@ enum DBResult DBVal_TypeMismatch = 1, /**< You cannot retrieve this data with this type. */ DBVal_Null = 2, /**< Field has no data (NULL) */ DBVal_Data = 3, /**< Field has data */ -} +}; + +/** + * Describes binding types. + */ +enum DBBindType +{ + DBBind_Int = 0, /**< Bind an integer. */ + DBBind_Float = 1, /**< Bind a float. */ + DBBind_String = 2, /**< Bind a string. */ +}; + +/** + * Threading priority level. + */ +enum DBPriority +{ + DBPrio_High = 0, /**< High priority. */ + DBPrio_Normal = 1, /**< Normal priority. */ + DBPrio_Low = 2, /**< Low priority. */ +}; /** * Creates an SQL connection from a named configuration. @@ -452,3 +472,91 @@ native SQL_BindParamString(Handle:statement, param, const String:value[], bool:c * @error Invalid statement Handle. */ native bool:SQL_Execute(Handle:statement); + +/** + * Locks a database so threading operations will not interrupt. + * + * If you are using a database Handle for both threading and non-threading, + * this MUST be called before doing any set of non-threading DB operations. + * Otherwise you risk corrupting the database driver's memory or network + * connection. + * + * Leaving a lock on a database and then executing a threaded query results + * in a dead lock! Make sure to call SQL_UnlockDatabase()! + * + * If the lock cannot be acquired, the main thread will pause until the + * threaded operation has concluded. + * + * @param database A database Handle. + * @noreturn + * @error Invalid database Handle. + */ +native SQL_LockDatabase(Handle:database); + +/** + * Unlocks a database so threading operations may continue. + * + * @param database A database Handle. + * @noreturn + * @error Invalid database Handle. + */ +native SQL_UnlockDatabase(Handle:database); + +/** + * General callback for threaded SQL stuff. + * + * @param db Parent object of the Handle (or INVALID_HANDLE if none). + * @param hndl Handle to the child object (or INVALID_HANDLE if none). + * @param error Error string, if any. + * @param + */ +functag SQLTCallback public(Handle:owner, Handle:hndl, const String:error[], any:data); + +/** + * Tells whether two database handles both point to the same database + * connection. + * + * @param hndl1 First database Handle. + * @param hndl2 Second database Handle. + * @return True if the Handles point to the same + * connection, false otherwise. + * @error Invalid Handle. + */ +native bool:SQL_IsSameConnection(Handle:hndl1, Handle:hndl2); + +/** + * Connects to a database via a thread. This can be used instead of + * SQL_Connect() if you wish for non-blocking functionality. + * + * It is not necessary to use this to use threaded queries. However, if you + * don't (or you mix threaded/non-threaded queries), you should see + * SQL_LockDatabase(). + * + * @param callback Callback; new Handle will be in hndl, owner is the driver. + * If no driver was found, the owner is INVALID_HANDLE. + * @param name Database name. + * @noreturn + */ +native SQL_TConnect(SQLTCallback:callback, const String:name[]="default"); + +/** + * Executes a simple query via a thread. The query Handle is passed through + * the callback. + * + * The database Handle returned through the callback is always a new Handle, + * and if necessary, SQL_IsSameConnection() should be used to test against + * other conenctions. + * + * The query Handle returned through the callback is temporary and destroyed + * at the end of the callback. If you need to hold onto it, use CloneHandle(). + * + * @param database A database Handle. + * @param callback Callback; database is in "owner" and the query Handle + * is passed in "hndl". + * @param query Query string. + * @param data Extra data value to pass to the callback. + * @param prio Priority queue to use. + * @noreturn + * @error Invalid database Handle. + */ +native SQL_TQuery(Handle:database, SQLTCallback:callback, const String:query[], any:data=0, DBPriority:prio=DBPrio_Normal); diff --git a/plugins/testsuite/sqltest.sp b/plugins/testsuite/sqltest.sp index 22dd092b..71e20179 100644 --- a/plugins/testsuite/sqltest.sp +++ b/plugins/testsuite/sqltest.sp @@ -13,6 +13,9 @@ public OnPluginStart() { RegServerCmd("sql_test_normal", Command_TestSql1) RegServerCmd("sql_test_stmt", Command_TestSql2) + RegServerCmd("sql_test_thread1", Command_TestSql3) + RegServerCmd("sql_test_thread2", Command_TestSql4) + RegServerCmd("sql_test_thread3", Command_TestSql5) } PrintQueryData(Handle:query) @@ -105,4 +108,71 @@ public Action:Command_TestSql2(args) return Plugin_Handled; } +new Handle:g_ThreadedHandle = INVALID_HANDLE; + +public CallbackTest3(Handle:owner, Handle:hndl, const String:error[], any:data) +{ + PrintToServer("CallbackTest1() (owner %x) (hndl %x) (error \"%s\") (data %d)", owner, hndl, error, data); + if (g_ThreadedHandle != INVALID_HANDLE && hndl != INVALID_HANDLE) + { + CloseHandle(hndl); + } else { + g_ThreadedHandle = hndl; + } +} + +public Action:Command_TestSql3(args) +{ + if (g_ThreadedHandle != INVALID_HANDLE) + { + PrintToServer("A threaded connection already exists, run the next test"); + return Plugin_Handled; + } + + new String:name[32]; + GetCmdArg(1, name, sizeof(name)); + + SQL_TConnect(CallbackTest3, name); + + return Plugin_Handled; +} + + +public Action:Command_TestSql4(args) +{ + SQL_LockDatabase(g_ThreadedHandle); + new Handle:query = SQL_Query(g_ThreadedHandle, "SELECT * FROM gaben") + if (query == INVALID_HANDLE) + { + new String:error[255]; + SQL_GetError(g_ThreadedHandle, error, sizeof(error)) + PrintToServer("Failed to query: %s", error) + } else { + PrintQueryData(query) + CloseHandle(query) + } + SQL_UnlockDatabase(g_ThreadedHandle); + + return Plugin_Handled; +} + +public CallbackTest5(Handle:owner, Handle:hndl, const String:error[], any:data) +{ + if (hndl == INVALID_HANDLE) + { + PrintToServer("Failed to query: %s", error) + } else { + PrintQueryData(hndl) + } +} + +public Action:Command_TestSql5(args) +{ + SQL_TQuery(g_ThreadedHandle, CallbackTest5, "SELECT * FROM gaben", 52) + SQL_TQuery(g_ThreadedHandle, CallbackTest5, "SELECT * FROM gaben", 52) + SQL_TQuery(g_ThreadedHandle, CallbackTest5, "SELECT * FROM gaben", 52) + SQL_TQuery(g_ThreadedHandle, CallbackTest5, "SELECT * FROM gaben", 52) + + return Plugin_Handled; +}