initial import of threaded sql functionality

--HG--
extra : convert_revision : svn%3A39bc706e-5318-0410-9160-8a85361fbb7c/trunk%401102
This commit is contained in:
David Anderson 2007-07-13 19:20:12 +00:00
parent 93c9ffdf02
commit cbb2546453
6 changed files with 828 additions and 3 deletions

View File

@ -21,12 +21,14 @@
#include "Logger.h" #include "Logger.h"
#include "ExtensionSys.h" #include "ExtensionSys.h"
#include <stdlib.h> #include <stdlib.h>
#include "ThreadSupport.h"
#define DBPARSE_LEVEL_NONE 0 #define DBPARSE_LEVEL_NONE 0
#define DBPARSE_LEVEL_MAIN 1 #define DBPARSE_LEVEL_MAIN 1
#define DBPARSE_LEVEL_DATABASE 2 #define DBPARSE_LEVEL_DATABASE 2
DBManager g_DBMan; DBManager g_DBMan;
static bool s_OneTimeThreaderErrorMsg = false;
DBManager::DBManager() DBManager::DBManager()
: m_StrTab(512), m_ParseLevel(0), m_ParseState(0), m_pDefault(NULL) : m_StrTab(512), m_ParseLevel(0), m_ParseState(0), m_pDefault(NULL)
@ -47,12 +49,24 @@ void DBManager::OnSourceModAllInitialized()
g_ShareSys.AddInterface(NULL, this); g_ShareSys.AddInterface(NULL, this);
g_SourceMod.BuildPath(Path_SM, m_Filename, sizeof(m_Filename), "configs/databases.cfg"); g_SourceMod.BuildPath(Path_SM, m_Filename, sizeof(m_Filename), "configs/databases.cfg");
m_pConfigLock = g_pThreader->MakeMutex();
m_pThinkLock = g_pThreader->MakeMutex();
m_pQueueLock = g_pThreader->MakeMutex();
g_PluginSys.AddPluginsListener(this);
} }
void DBManager::OnSourceModLevelChange(const char *mapName) void DBManager::OnSourceModLevelChange(const char *mapName)
{ {
SMCParseError err; SMCParseError err;
unsigned int line = 0; unsigned int line = 0;
/* We lock and don't give up the lock until we're done.
* This way the thread's search won't be searching through a
* potentially empty/corrupt list, which would be very bad.
*/
m_pConfigLock->Lock();
if ((err = g_TextParser.ParseFile_SMC(m_Filename, this, &line, NULL)) != SMCParse_Okay) if ((err = g_TextParser.ParseFile_SMC(m_Filename, this, &line, NULL)) != SMCParse_Okay)
{ {
g_Logger.LogError("[SM] Detected parse error(s) in file \"%s\"", m_Filename); g_Logger.LogError("[SM] Detected parse error(s) in file \"%s\"", m_Filename);
@ -62,10 +76,17 @@ void DBManager::OnSourceModLevelChange(const char *mapName)
g_Logger.LogError("[SM] Line %d: %s", line, txt); g_Logger.LogError("[SM] Line %d: %s", line, txt);
} }
} }
m_pConfigLock->Unlock();
g_PluginSys.RemovePluginsListener(this);
} }
void DBManager::OnSourceModShutdown() void DBManager::OnSourceModShutdown()
{ {
KillWorkerThread();
m_pConfigLock->DestroyThis();
m_pThinkLock->DestroyThis();
m_pQueueLock->DestroyThis();
g_HandleSys.RemoveType(m_DatabaseType, g_pCoreIdent); g_HandleSys.RemoveType(m_DatabaseType, g_pCoreIdent);
g_HandleSys.RemoveType(m_DriverType, g_pCoreIdent); g_HandleSys.RemoveType(m_DriverType, g_pCoreIdent);
} }
@ -269,11 +290,24 @@ bool DBManager::Connect(const char *name, IDBDriver **pdr, IDatabase **pdb, bool
void DBManager::AddDriver(IDBDriver *pDriver) void DBManager::AddDriver(IDBDriver *pDriver)
{ {
/* Let's kill the worker. Join the thread and let the queries flush.
* This is kind of stupid but we just want to unload safely.
* Rather than recreate the worker, we'll wait until someone throws
* another query through.
*/
KillWorkerThread();
m_drivers.push_back(pDriver); m_drivers.push_back(pDriver);
} }
void DBManager::RemoveDriver(IDBDriver *pDriver) void DBManager::RemoveDriver(IDBDriver *pDriver)
{ {
/* Again, we're forced to kill the worker. How rude!
* Doing this flushes the queue, and thus we don't need to
* clean anything else.
*/
KillWorkerThread();
for (size_t i=0; i<m_drivers.size(); i++) for (size_t i=0; i<m_drivers.size(); i++)
{ {
if (m_drivers[i] == pDriver) if (m_drivers[i] == pDriver)
@ -293,6 +327,31 @@ void DBManager::RemoveDriver(IDBDriver *pDriver)
db.realDriver = NULL; db.realDriver = NULL;
} }
} }
/* Now that the driver is gone, we have to test the think queue.
* Whatever happens therein is up to the db op!
*/
Queue<IDBThreadOperation *>::iterator qiter = m_ThinkQueue.begin();
Queue<IDBThreadOperation *> templist;
while (qiter != m_ThinkQueue.end())
{
IDBThreadOperation *op = (*qiter);
if (op->GetDriver() == pDriver)
{
templist.push(op);
qiter = m_ThinkQueue.erase(qiter);
} else {
qiter++;
}
}
for (qiter = templist.begin();
qiter != templist.end();
qiter++)
{
IDBThreadOperation *op = (*qiter);
op->CancelThinkPart();
}
} }
IDBDriver *DBManager::GetDefaultDriver() IDBDriver *DBManager::GetDefaultDriver()
@ -417,3 +476,182 @@ IDBDriver *DBManager::FindOrLoadDriver(const char *name)
return NULL; return NULL;
} }
void DBManager::KillWorkerThread()
{
if (m_pWorker)
{
m_pWorker->Stop(false);
g_pThreader->DestroyWorker(m_pWorker);
m_pWorker = NULL;
s_OneTimeThreaderErrorMsg = false;
}
}
bool DBManager::AddToThreadQueue(IDBThreadOperation *op, PrioQueueLevel prio)
{
if (!m_pWorker)
{
m_pWorker = g_pThreader->MakeWorker(this, true);
if (!m_pWorker)
{
if (!s_OneTimeThreaderErrorMsg)
{
g_Logger.LogError("[SM] Unable to create db threader (error unknown)");
s_OneTimeThreaderErrorMsg = true;
}
return false;
}
if (!m_pWorker->Start())
{
if (!s_OneTimeThreaderErrorMsg)
{
g_Logger.LogError("[SM] Unable to start db threader (error unknown)");
s_OneTimeThreaderErrorMsg = true;
}
g_pThreader->DestroyWorker(m_pWorker);
m_pWorker = NULL;
return false;
}
}
/* Add to the queue */
{
m_pQueueLock->Lock();
Queue<IDBThreadOperation *> &queue = m_OpQueue.GetQueue(prio);
queue.push(op);
m_pQueueLock->Unlock();
}
/* Make the thread */
m_pWorker->MakeThread(this);
return true;
}
void DBManager::OnWorkerStart(IThreadWorker *pWorker)
{
m_drSafety.clear();
for (size_t i=0; i<m_drivers.size(); i++)
{
if (m_drivers[i]->IsThreadSafe())
{
m_drSafety.push_back(m_drivers[i]->InitializeThreadSafety());
} else {
m_drSafety.push_back(false);
}
}
}
void DBManager::OnWorkerStop(IThreadWorker *pWorker)
{
for (size_t i=0; i<m_drivers.size(); i++)
{
if (m_drSafety[i])
{
m_drivers[i]->ShutdownThreadSafety();
}
}
m_drSafety.clear();
}
void DBManager::RunThread(IThreadHandle *pThread)
{
IDBThreadOperation *op = NULL;
/* Get something from the queue */
{
m_pQueueLock->Lock();
Queue<IDBThreadOperation *> &queue = m_OpQueue.GetLikelyQueue();
if (!queue.empty())
{
op = queue.first();
queue.pop();
}
m_pQueueLock->Unlock();
}
/* whoa. hi. did we get something? we should have. */
if (!op)
{
/* wtf? */
return;
}
op->RunThreadPart();
m_pThinkLock->Lock();
m_ThinkQueue.push(op);
m_pThinkLock->Unlock();
}
void DBManager::RunFrame()
{
/* Don't bother if we're empty */
if (!m_ThinkQueue.size())
{
return;
}
/* Dump one thing per-frame so the server stays sane. */
m_pThinkLock->Lock();
IDBThreadOperation *op = m_ThinkQueue.first();
m_ThinkQueue.pop();
m_pThinkLock->Unlock();
op->RunThinkPart();
}
void DBManager::OnTerminate(IThreadHandle *pThread, bool cancel)
{
/* Do nothing */
}
void DBManager::OnPluginUnloaded(IPlugin *plugin)
{
/* Kill the thread so we can flush everything into the think queue... */
KillWorkerThread();
/* Mark the plugin as being unloaded so future database calls will ignore threading... */
plugin->SetProperty("DisallowDBThreads", NULL);
/* Run all of the think operations.
* Unlike the driver unloading example, we'll let these calls go through,
* since a plugin unloading is far more normal.
*/
Queue<IDBThreadOperation *>::iterator iter = m_ThinkQueue.begin();
Queue<IDBThreadOperation *> templist;
while (iter != m_ThinkQueue.end())
{
IDBThreadOperation *op = (*iter);
if (op->GetPlugin() == plugin)
{
templist.push(op);
iter = m_ThinkQueue.erase(iter);
} else {
iter++;
}
}
for (iter = templist.begin();
iter != templist.end();
iter++)
{
IDBThreadOperation *op = (*iter);
op->RunThinkPart();
}
}
void DBManager::LockConfig()
{
m_pConfigLock->Lock();
}
void DBManager::UnlockConfig()
{
m_pConfigLock->Unlock();
}
const char *DBManager::GetDefaultDriverName()
{
return m_DefDriver.c_str();
}

View File

@ -22,6 +22,9 @@
#include <sh_list.h> #include <sh_list.h>
#include <ITextParsers.h> #include <ITextParsers.h>
#include "sm_memtable.h" #include "sm_memtable.h"
#include <IThreader.h>
#include "sm_simple_prioqueue.h"
#include "PluginSys.h"
using namespace SourceHook; using namespace SourceHook;
@ -41,11 +44,24 @@ struct ConfDbInfo
DatabaseInfo info; DatabaseInfo info;
}; };
class IDBThreadOperation
{
public:
virtual IDBDriver *GetDriver() =0;
virtual CPlugin *GetPlugin() =0;
virtual void RunThreadPart() =0;
virtual void RunThinkPart() =0;
virtual void CancelThinkPart() =0;
};
class DBManager : class DBManager :
public IDBManager, public IDBManager,
public SMGlobalClass, public SMGlobalClass,
public IHandleTypeDispatch, public IHandleTypeDispatch,
public ITextListener_SMC public ITextListener_SMC,
public IThread,
public IThreadWorkerCallbacks,
public IPluginsListener
{ {
public: public:
DBManager(); DBManager();
@ -74,12 +90,41 @@ public: //ITextListener_SMC
SMCParseResult ReadSMC_KeyValue(const char *key, const char *value, bool key_quotes, bool value_quotes); SMCParseResult ReadSMC_KeyValue(const char *key, const char *value, bool key_quotes, bool value_quotes);
SMCParseResult ReadSMC_LeavingSection(); SMCParseResult ReadSMC_LeavingSection();
void ReadSMC_ParseEnd(bool halted, bool failed); void ReadSMC_ParseEnd(bool halted, bool failed);
public: //IThread
void RunThread(IThreadHandle *pThread);
void OnTerminate(IThreadHandle *pThread, bool cancel);
public: //IThreadWorkerCallbacks
void OnWorkerStart(IThreadWorker *pWorker);
void OnWorkerStop(IThreadWorker *pWorker);
public: //IPluginsListener
void OnPluginUnloaded(IPlugin *plugin);
public: public:
ConfDbInfo *GetDatabaseConf(const char *name); ConfDbInfo *GetDatabaseConf(const char *name);
IDBDriver *FindOrLoadDriver(const char *name); IDBDriver *FindOrLoadDriver(const char *name);
IDBDriver *GetDefaultDriver(); IDBDriver *GetDefaultDriver();
const char *GetDefaultDriverName();
bool AddToThreadQueue(IDBThreadOperation *op, PrioQueueLevel prio);
void LockConfig();
void UnlockConfig();
void RunFrame();
inline HandleType_t GetDatabaseType()
{
return m_DatabaseType;
}
private:
void KillWorkerThread();
private: private:
CVector<IDBDriver *> m_drivers; CVector<IDBDriver *> m_drivers;
/* Threading stuff */
PrioQueue<IDBThreadOperation *> m_OpQueue;
Queue<IDBThreadOperation *> m_ThinkQueue;
CVector<bool> m_drSafety; /* which drivers are safe? */
IThreadWorker *m_pWorker; /* Worker thread object */
IMutex *m_pConfigLock; /* Configuration lock */
IMutex *m_pQueueLock; /* Queue safety lock */
IMutex *m_pThinkLock; /* Think-queue lock */
List<ConfDbInfo> m_confs; List<ConfDbInfo> m_confs;
HandleType_t m_DriverType; HandleType_t m_DriverType;
HandleType_t m_DatabaseType; HandleType_t m_DatabaseType;

View File

@ -3,6 +3,7 @@
#include "Database.h" #include "Database.h"
#include "ExtensionSys.h" #include "ExtensionSys.h"
#include "PluginSys.h" #include "PluginSys.h"
#include "sm_stringutil.h"
HandleType_t hQueryType; HandleType_t hQueryType;
HandleType_t hStmtType; HandleType_t hStmtType;
@ -78,6 +79,197 @@ inline HandleError ReadDbOrStmtHndl(Handle_t hndl, IPluginContext *pContext, IDa
return err; return err;
} }
class TQueryOp : public IDBThreadOperation
{
public:
TQueryOp(IDatabase *db, IPluginFunction *pf, const char *query, cell_t data) :
m_pDatabase(db), m_pFunction(pf), m_Query(query), m_Data(data),
me(g_PluginSys.GetPluginByCtx(pf->GetParentContext()->GetContext())),
m_pQuery(NULL)
{
/* We always increase the reference count because this is potentially
* asynchronous. Otherwise the original handle could be closed while
* we're still latched onto it.
*/
m_pDatabase->IncReferenceCount();
/* Now create our own Handle such that it can only be closed by us.
* We allow cloning just in case someone wants to hold onto it.
*/
HandleSecurity sec(me->GetIdentity(), g_pCoreIdent);
HandleAccess access;
g_HandleSys.InitAccessDefaults(NULL, &access);
access.access[HandleAccess_Delete] = HANDLE_RESTRICT_IDENTITY|HANDLE_RESTRICT_OWNER;
m_MyHandle = g_HandleSys.CreateHandleEx(g_DBMan.GetDatabaseType(),
db,
&sec,
&access,
NULL);
}
~TQueryOp()
{
if (m_pQuery)
{
m_pQuery->Destroy();
}
/* Close our Handle if it's valid. */
if (m_MyHandle != BAD_HANDLE)
{
HandleSecurity sec(me->GetIdentity(), g_pCoreIdent);
g_HandleSys.FreeHandle(m_MyHandle, &sec);
} else {
/* Otherwise, there is an open ref to the db */
m_pDatabase->Close();
}
}
CPlugin *GetPlugin()
{
return me;
}
IDBDriver *GetDriver()
{
return m_pDatabase->GetDriver();
}
void RunThreadPart()
{
m_pDatabase->LockForFullAtomicOperation();
m_pQuery = m_pDatabase->DoQuery(m_Query.c_str());
if (!m_pQuery)
{
UTIL_Format(error, sizeof(error), "%s", m_pDatabase->GetError());
}
m_pDatabase->UnlockFromFullAtomicOperation();
}
void CancelThinkPart()
{
m_pFunction->PushCell(BAD_HANDLE);
m_pFunction->PushCell(BAD_HANDLE);
m_pFunction->PushString("Driver is unloading");
m_pFunction->PushCell(m_Data);
m_pFunction->Execute(NULL);
delete this;
}
void RunThinkPart()
{
/* Create a Handle for our query */
HandleSecurity sec(me->GetIdentity(), g_pCoreIdent);
HandleAccess access;
g_HandleSys.InitAccessDefaults(NULL, &access);
access.access[HandleAccess_Delete] = HANDLE_RESTRICT_IDENTITY|HANDLE_RESTRICT_OWNER;
Handle_t qh = BAD_HANDLE;
if (m_pQuery)
{
qh = g_HandleSys.CreateHandle(hQueryType, m_pQuery, me->GetIdentity(), g_pCoreIdent, NULL);
if (qh != BAD_HANDLE)
{
m_pQuery = NULL;
} else {
UTIL_Format(error, sizeof(error), "Could not alloc handle");
}
}
m_pFunction->PushCell(m_MyHandle);
m_pFunction->PushCell(qh);
m_pFunction->PushString(qh == BAD_HANDLE ? error : "");
m_pFunction->PushCell(m_Data);
m_pFunction->Execute(NULL);
if (qh != BAD_HANDLE)
{
g_HandleSys.FreeHandle(qh, &sec);
}
delete this;
}
private:
IDatabase *m_pDatabase;
IPluginFunction *m_pFunction;
String m_Query;
cell_t m_Data;
CPlugin *me;
IQuery *m_pQuery;
char error[255];
Handle_t m_MyHandle;
};
class TConnectOp : public IDBThreadOperation
{
public:
TConnectOp(IPluginFunction *func, IDBDriver *driver, const char *_dbname)
{
m_pFunction = func;
m_pDriver = driver;
m_pDatabase = NULL;
error[0] = '\0';
strncopy(dbname, _dbname, sizeof(dbname));
me = g_PluginSys.GetPluginByCtx(m_pFunction->GetParentContext()->GetContext());
}
CPlugin *GetPlugin()
{
return me;
}
IDBDriver *GetDriver()
{
return m_pDriver;
}
void RunThreadPart()
{
g_DBMan.LockConfig();
const DatabaseInfo *pInfo = g_DBMan.FindDatabaseConf(dbname);
if (!pInfo)
{
UTIL_Format(error, sizeof(error), "Could not find database config \"%s\"", dbname);
} else {
m_pDatabase = m_pDriver->Connect(pInfo, false, error, sizeof(error));
}
g_DBMan.UnlockConfig();
}
void CancelThinkPart()
{
if (m_pDatabase)
{
m_pDatabase->Close();
}
m_pFunction->PushCell(BAD_HANDLE);
m_pFunction->PushCell(BAD_HANDLE);
m_pFunction->PushString("Driver is unloading");
m_pFunction->PushCell(0);
m_pFunction->Execute(NULL);
delete this;
}
void RunThinkPart()
{
Handle_t hndl = BAD_HANDLE;
if (m_pDatabase)
{
if ((hndl = g_DBMan.CreateHandle(DBHandle_Database, m_pDatabase, me->GetIdentity()))
== BAD_HANDLE)
{
m_pDatabase->Close();
UTIL_Format(error, sizeof(error), "Unable to allocate Handle");
}
}
m_pFunction->PushCell(m_pDriver->GetHandle());
m_pFunction->PushCell(hndl);
m_pFunction->PushString(hndl == BAD_HANDLE ? "" : error);
m_pFunction->PushCell(0);
m_pFunction->Execute(NULL);
delete this;
}
private:
CPlugin *me;
IPluginFunction *m_pFunction;
IDBDriver *m_pDriver;
IDatabase *m_pDatabase;
char dbname[64];
char error[255];
};
static cell_t SQL_Connect(IPluginContext *pContext, const cell_t *params) static cell_t SQL_Connect(IPluginContext *pContext, const cell_t *params)
{ {
char *conf, *err; char *conf, *err;
@ -111,6 +303,75 @@ static cell_t SQL_Connect(IPluginContext *pContext, const cell_t *params)
return hndl; return hndl;
} }
static cell_t SQL_TConnect(IPluginContext *pContext, const cell_t *params)
{
IPluginFunction *pf = pContext->GetFunctionById(params[1]);
if (!pf)
{
return pContext->ThrowNativeError("Function id %x is invalid", params[1]);
}
char *conf;
pContext->LocalToString(params[2], &conf);
IDBDriver *driver = NULL;
const DatabaseInfo *pInfo = g_DBMan.FindDatabaseConf(conf);
char error[255];
if (pInfo != NULL)
{
if (pInfo->driver[0] == '\0')
{
driver = g_DBMan.GetDefaultDriver();
} else {
driver = g_DBMan.FindOrLoadDriver(pInfo->driver);
}
if (!driver)
{
UTIL_Format(error,
sizeof(error),
"Could not find driver \"%s\"",
pInfo->driver[0] == '\0' ? g_DBMan.GetDefaultDriverName() : pInfo->driver);
} else if (!driver->IsThreadSafe()) {
UTIL_Format(error,
sizeof(error),
"Driver \"%s\" is not thread safe!",
driver->GetIdentifier());
}
} else {
UTIL_Format(error, sizeof(error), "Could not find database conf \"%s\"", conf);
}
if (!pInfo || !driver)
{
pf->PushCell(BAD_HANDLE);
pf->PushCell(BAD_HANDLE);
pf->PushString(error);
pf->PushCell(0);
pf->Execute(NULL);
return 0;
}
/* HACK! Add us to the dependency list */
CExtension *pExt = g_Extensions.GetExtensionFromIdent(driver->GetIdentity());
if (pExt)
{
g_Extensions.BindChildPlugin(pExt, g_PluginSys.FindPluginByContext(pContext->GetContext()));
}
/* Finally, add to the thread if we can */
TConnectOp *op = new TConnectOp(pf, driver, conf);
CPlugin *pPlugin = g_PluginSys.GetPluginByCtx(pContext->GetContext());
if (pPlugin->GetProperty("DisallowDBThreads", NULL)
|| !g_DBMan.AddToThreadQueue(op, PrioQueue_High))
{
/* Do everything right now */
op->RunThreadPart();
op->RunThinkPart();
}
return 1;
}
static cell_t SQL_ConnectEx(IPluginContext *pContext, const cell_t *params) static cell_t SQL_ConnectEx(IPluginContext *pContext, const cell_t *params)
{ {
IDBDriver *driver; IDBDriver *driver;
@ -380,6 +641,81 @@ static cell_t SQL_Query(IPluginContext *pContext, const cell_t *params)
return hndl; return hndl;
} }
static cell_t SQL_TQuery(IPluginContext *pContext, const cell_t *params)
{
IDatabase *db = NULL;
HandleError err;
if ((err = g_DBMan.ReadHandle(params[1], DBHandle_Database, (void **)&db))
!= HandleError_None)
{
return pContext->ThrowNativeError("Invalid database Handle %x (error: %d)", params[1], err);
}
IPluginFunction *pf = pContext->GetFunctionById(params[2]);
if (!pf)
{
return pContext->ThrowNativeError("Function id %x is invalid", params[2]);
}
char *query;
pContext->LocalToString(params[3], &query);
cell_t data = params[4];
PrioQueueLevel level = PrioQueue_Normal;
if (params[5] == (cell_t)PrioQueue_High)
{
level = PrioQueue_High;
} else if (params[5] == (cell_t)PrioQueue_Low) {
level = PrioQueue_Low;
}
CPlugin *pPlugin = g_PluginSys.GetPluginByCtx(pContext->GetContext());
TQueryOp *op = new TQueryOp(db, pf, query, data);
if (pPlugin->GetProperty("DisallowDBThreads", NULL)
|| !g_DBMan.AddToThreadQueue(op, level))
{
/* Do everything right now */
op->RunThreadPart();
op->RunThinkPart();
}
return 1;
}
static cell_t SQL_LockDatabase(IPluginContext *pContext, const cell_t *params)
{
IDatabase *db = NULL;
HandleError err;
if ((err = g_DBMan.ReadHandle(params[1], DBHandle_Database, (void **)&db))
!= HandleError_None)
{
return pContext->ThrowNativeError("Invalid database Handle %x (error: %d)", params[1], err);
}
db->LockForFullAtomicOperation();
return 1;
}
static cell_t SQL_UnlockDatabase(IPluginContext *pContext, const cell_t *params)
{
IDatabase *db = NULL;
HandleError err;
if ((err = g_DBMan.ReadHandle(params[1], DBHandle_Database, (void **)&db))
!= HandleError_None)
{
return pContext->ThrowNativeError("Invalid database Handle %x (error: %d)", params[1], err);
}
db->UnlockFromFullAtomicOperation();
return 1;
}
static cell_t SQL_PrepareQuery(IPluginContext *pContext, const cell_t *params) static cell_t SQL_PrepareQuery(IPluginContext *pContext, const cell_t *params)
{ {
IDatabase *db = NULL; IDatabase *db = NULL;
@ -838,6 +1174,26 @@ static cell_t SQL_Execute(IPluginContext *pContext, const cell_t *params)
return stmt->Execute() ? 1 : 0; return stmt->Execute() ? 1 : 0;
} }
static cell_t SQL_IsSameConnection(IPluginContext *pContext, const cell_t *params)
{
IDatabase *db1=NULL, *db2=NULL;
HandleError err;
if ((err = g_DBMan.ReadHandle(params[1], DBHandle_Database, (void **)&db1))
!= HandleError_None)
{
return pContext->ThrowNativeError("Invalid database Handle 1/%x (error: %d)", params[1], err);
}
if ((err = g_DBMan.ReadHandle(params[2], DBHandle_Database, (void **)&db2))
!= HandleError_None)
{
return pContext->ThrowNativeError("Invalid database Handle 2/%x (error: %d)", params[2], err);
}
return (db1 == db2) ? true : false;
}
REGISTER_NATIVES(dbNatives) REGISTER_NATIVES(dbNatives)
{ {
{"SQL_BindParamInt", SQL_BindParamInt}, {"SQL_BindParamInt", SQL_BindParamInt},
@ -865,10 +1221,15 @@ REGISTER_NATIVES(dbNatives)
{"SQL_GetRowCount", SQL_GetRowCount}, {"SQL_GetRowCount", SQL_GetRowCount},
{"SQL_HasResultSet", SQL_HasResultSet}, {"SQL_HasResultSet", SQL_HasResultSet},
{"SQL_IsFieldNull", SQL_IsFieldNull}, {"SQL_IsFieldNull", SQL_IsFieldNull},
{"SQL_IsSameConnection", SQL_IsSameConnection},
{"SQL_LockDatabase", SQL_LockDatabase},
{"SQL_MoreRows", SQL_MoreRows}, {"SQL_MoreRows", SQL_MoreRows},
{"SQL_PrepareQuery", SQL_PrepareQuery},
{"SQL_Query", SQL_Query}, {"SQL_Query", SQL_Query},
{"SQL_QuoteString", SQL_QuoteString}, {"SQL_QuoteString", SQL_QuoteString},
{"SQL_PrepareQuery", SQL_PrepareQuery},
{"SQL_Rewind", SQL_Rewind}, {"SQL_Rewind", SQL_Rewind},
{"SQL_TConnect", SQL_TConnect},
{"SQL_TQuery", SQL_TQuery},
{"SQL_UnlockDatabase", SQL_UnlockDatabase},
{NULL, NULL}, {NULL, NULL},
}; };

View File

@ -31,6 +31,7 @@
#include "TimerSys.h" #include "TimerSys.h"
#include "MenuStyle_Valve.h" #include "MenuStyle_Valve.h"
#include "MenuStyle_Radio.h" #include "MenuStyle_Radio.h"
#include "Database.h"
SH_DECL_HOOK6(IServerGameDLL, LevelInit, SH_NOATTRIB, false, bool, const char *, const char *, const char *, const char *, bool, bool); SH_DECL_HOOK6(IServerGameDLL, LevelInit, SH_NOATTRIB, false, bool, const char *, const char *, const char *, const char *, bool, bool);
SH_DECL_HOOK0_void(IServerGameDLL, LevelShutdown, SH_NOATTRIB, false); SH_DECL_HOOK0_void(IServerGameDLL, LevelShutdown, SH_NOATTRIB, false);
@ -337,6 +338,8 @@ void SimulateTick()
void SourceModBase::GameFrame(bool simulating) void SourceModBase::GameFrame(bool simulating)
{ {
g_DBMan.RunFrame();
/** /**
* Note: This is all hardcoded rather than delegated to save * Note: This is all hardcoded rather than delegated to save
* precious CPU cycles. * precious CPU cycles.

View File

@ -56,7 +56,27 @@ enum DBResult
DBVal_TypeMismatch = 1, /**< You cannot retrieve this data with this type. */ DBVal_TypeMismatch = 1, /**< You cannot retrieve this data with this type. */
DBVal_Null = 2, /**< Field has no data (NULL) */ DBVal_Null = 2, /**< Field has no data (NULL) */
DBVal_Data = 3, /**< Field has data */ DBVal_Data = 3, /**< Field has data */
} };
/**
* Describes binding types.
*/
enum DBBindType
{
DBBind_Int = 0, /**< Bind an integer. */
DBBind_Float = 1, /**< Bind a float. */
DBBind_String = 2, /**< Bind a string. */
};
/**
* Threading priority level.
*/
enum DBPriority
{
DBPrio_High = 0, /**< High priority. */
DBPrio_Normal = 1, /**< Normal priority. */
DBPrio_Low = 2, /**< Low priority. */
};
/** /**
* Creates an SQL connection from a named configuration. * Creates an SQL connection from a named configuration.
@ -452,3 +472,91 @@ native SQL_BindParamString(Handle:statement, param, const String:value[], bool:c
* @error Invalid statement Handle. * @error Invalid statement Handle.
*/ */
native bool:SQL_Execute(Handle:statement); native bool:SQL_Execute(Handle:statement);
/**
* Locks a database so threading operations will not interrupt.
*
* If you are using a database Handle for both threading and non-threading,
* this MUST be called before doing any set of non-threading DB operations.
* Otherwise you risk corrupting the database driver's memory or network
* connection.
*
* Leaving a lock on a database and then executing a threaded query results
* in a dead lock! Make sure to call SQL_UnlockDatabase()!
*
* If the lock cannot be acquired, the main thread will pause until the
* threaded operation has concluded.
*
* @param database A database Handle.
* @noreturn
* @error Invalid database Handle.
*/
native SQL_LockDatabase(Handle:database);
/**
* Unlocks a database so threading operations may continue.
*
* @param database A database Handle.
* @noreturn
* @error Invalid database Handle.
*/
native SQL_UnlockDatabase(Handle:database);
/**
* General callback for threaded SQL stuff.
*
* @param db Parent object of the Handle (or INVALID_HANDLE if none).
* @param hndl Handle to the child object (or INVALID_HANDLE if none).
* @param error Error string, if any.
* @param
*/
functag SQLTCallback public(Handle:owner, Handle:hndl, const String:error[], any:data);
/**
* Tells whether two database handles both point to the same database
* connection.
*
* @param hndl1 First database Handle.
* @param hndl2 Second database Handle.
* @return True if the Handles point to the same
* connection, false otherwise.
* @error Invalid Handle.
*/
native bool:SQL_IsSameConnection(Handle:hndl1, Handle:hndl2);
/**
* Connects to a database via a thread. This can be used instead of
* SQL_Connect() if you wish for non-blocking functionality.
*
* It is not necessary to use this to use threaded queries. However, if you
* don't (or you mix threaded/non-threaded queries), you should see
* SQL_LockDatabase().
*
* @param callback Callback; new Handle will be in hndl, owner is the driver.
* If no driver was found, the owner is INVALID_HANDLE.
* @param name Database name.
* @noreturn
*/
native SQL_TConnect(SQLTCallback:callback, const String:name[]="default");
/**
* Executes a simple query via a thread. The query Handle is passed through
* the callback.
*
* The database Handle returned through the callback is always a new Handle,
* and if necessary, SQL_IsSameConnection() should be used to test against
* other conenctions.
*
* The query Handle returned through the callback is temporary and destroyed
* at the end of the callback. If you need to hold onto it, use CloneHandle().
*
* @param database A database Handle.
* @param callback Callback; database is in "owner" and the query Handle
* is passed in "hndl".
* @param query Query string.
* @param data Extra data value to pass to the callback.
* @param prio Priority queue to use.
* @noreturn
* @error Invalid database Handle.
*/
native SQL_TQuery(Handle:database, SQLTCallback:callback, const String:query[], any:data=0, DBPriority:prio=DBPrio_Normal);

View File

@ -13,6 +13,9 @@ public OnPluginStart()
{ {
RegServerCmd("sql_test_normal", Command_TestSql1) RegServerCmd("sql_test_normal", Command_TestSql1)
RegServerCmd("sql_test_stmt", Command_TestSql2) RegServerCmd("sql_test_stmt", Command_TestSql2)
RegServerCmd("sql_test_thread1", Command_TestSql3)
RegServerCmd("sql_test_thread2", Command_TestSql4)
RegServerCmd("sql_test_thread3", Command_TestSql5)
} }
PrintQueryData(Handle:query) PrintQueryData(Handle:query)
@ -105,4 +108,71 @@ public Action:Command_TestSql2(args)
return Plugin_Handled; return Plugin_Handled;
} }
new Handle:g_ThreadedHandle = INVALID_HANDLE;
public CallbackTest3(Handle:owner, Handle:hndl, const String:error[], any:data)
{
PrintToServer("CallbackTest1() (owner %x) (hndl %x) (error \"%s\") (data %d)", owner, hndl, error, data);
if (g_ThreadedHandle != INVALID_HANDLE && hndl != INVALID_HANDLE)
{
CloseHandle(hndl);
} else {
g_ThreadedHandle = hndl;
}
}
public Action:Command_TestSql3(args)
{
if (g_ThreadedHandle != INVALID_HANDLE)
{
PrintToServer("A threaded connection already exists, run the next test");
return Plugin_Handled;
}
new String:name[32];
GetCmdArg(1, name, sizeof(name));
SQL_TConnect(CallbackTest3, name);
return Plugin_Handled;
}
public Action:Command_TestSql4(args)
{
SQL_LockDatabase(g_ThreadedHandle);
new Handle:query = SQL_Query(g_ThreadedHandle, "SELECT * FROM gaben")
if (query == INVALID_HANDLE)
{
new String:error[255];
SQL_GetError(g_ThreadedHandle, error, sizeof(error))
PrintToServer("Failed to query: %s", error)
} else {
PrintQueryData(query)
CloseHandle(query)
}
SQL_UnlockDatabase(g_ThreadedHandle);
return Plugin_Handled;
}
public CallbackTest5(Handle:owner, Handle:hndl, const String:error[], any:data)
{
if (hndl == INVALID_HANDLE)
{
PrintToServer("Failed to query: %s", error)
} else {
PrintQueryData(hndl)
}
}
public Action:Command_TestSql5(args)
{
SQL_TQuery(g_ThreadedHandle, CallbackTest5, "SELECT * FROM gaben", 52)
SQL_TQuery(g_ThreadedHandle, CallbackTest5, "SELECT * FROM gaben", 52)
SQL_TQuery(g_ThreadedHandle, CallbackTest5, "SELECT * FROM gaben", 52)
SQL_TQuery(g_ThreadedHandle, CallbackTest5, "SELECT * FROM gaben", 52)
return Plugin_Handled;
}