sourcemod/core/logic/ThreadSupport.cpp
Peace-Maker 3c30f7b971 Fix crash when creating threads with Thread_AutoRelease
Setting the Thread_AutoRelease flag (default when using IThreader::MakeThread) caused a use-after-free after running the thread body.
2021-03-11 09:48:39 +01:00

554 lines
13 KiB
C++

/**
* vim: set ts=4 sw=4 tw=99 noet :
* =============================================================================
* SourceMod
* Copyright (C) 2004-2009 AlliedModders LLC. All rights reserved.
* =============================================================================
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 3.0, as published by the
* Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, AlliedModders LLC gives you permission to link the
* code of this program (as well as its derivative works) to "Half-Life 2," the
* "Source Engine," the "SourcePawn JIT," and any Game MODs that run on software
* by the Valve Corporation. You must obey the GNU General Public License in
* all respects for all other code used. Additionally, AlliedModders LLC grants
* this exception to all derivative works. AlliedModders LLC defines further
* exceptions, found in LICENSE.txt (as of this writing, version JULY-31-2007),
* or <http://www.sourcemod.net/license.php>.
*
* Version: $Id$
*/
#include <sm_platform.h>
#include <amtl/am-deque.h>
#include <amtl/am-maybe.h>
#include <amtl/am-thread.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include "BaseWorker.h"
#include "ThreadSupport.h"
#include "common_logic.h"
static constexpr unsigned int DEFAULT_THINK_TIME_MS = 20;
class CompatWorker final : public IThreadWorker
{
public:
explicit CompatWorker(IThreadWorkerCallbacks* callbacks);
~CompatWorker();
void MakeThread(IThread *pThread) override;
IThreadHandle *MakeThread(IThread *pThread, ThreadFlags flags) override;
IThreadHandle *MakeThread(IThread *pThread, const ThreadParams *params) override;
void GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) override;
unsigned int RunFrame() override;
bool Pause() override;
bool Unpause() override;
bool Start() override;
bool Stop(bool flush) override;
WorkerState GetStatus(unsigned int *numThreads) override;
void SetMaxThreadsPerFrame(unsigned int threads) override;
void SetThinkTimePerFrame(unsigned int thinktime) override;
private:
void Flush();
void Worker();
void RunWork(SWThreadHandle* handle);
void RunWorkLocked(std::unique_lock<std::mutex>* lock, SWThreadHandle* handle);
private:
IThreadWorkerCallbacks* callbacks_;
WorkerState state_;
std::mutex mutex_;
std::condition_variable work_cv_;
std::deque<SWThreadHandle*> work_;
std::unique_ptr<std::thread> thread_;
std::atomic<unsigned int> jobs_per_wakeup_;
std::atomic<unsigned int> wait_between_jobs_;
};
CompatWorker::CompatWorker(IThreadWorkerCallbacks* callbacks)
: callbacks_(callbacks),
state_(Worker_Stopped),
jobs_per_wakeup_(SM_DEFAULT_THREADS_PER_FRAME),
wait_between_jobs_(DEFAULT_THINK_TIME_MS)
{
}
CompatWorker::~CompatWorker()
{
Stop(false /* ignored */);
Flush();
}
bool CompatWorker::Start()
{
std::lock_guard<std::mutex> lock(mutex_);
if (state_ != Worker_Stopped)
return false;
thread_ = ke::NewThread("SM CompatWorker Thread", [this]() -> void {
Worker();
});
state_ = Worker_Running;
return true;
}
bool CompatWorker::Stop(bool)
{
{
std::lock_guard<std::mutex> lock(mutex_);
if (state_ <= Worker_Stopped)
return false;
state_ = Worker_Stopped;
work_cv_.notify_all();
}
thread_->join();
thread_ = nullptr;
Flush();
return true;
}
bool CompatWorker::Pause()
{
std::lock_guard<std::mutex> lock(mutex_);
if (state_ != Worker_Running)
return false;
state_ = Worker_Paused;
work_cv_.notify_all();
return true;
}
bool CompatWorker::Unpause()
{
std::lock_guard<std::mutex> lock(mutex_);
if (state_ != Worker_Paused)
return false;
state_ = Worker_Running;
work_cv_.notify_all();
return true;
}
void CompatWorker::Flush()
{
while (!work_.empty()) {
auto handle = ke::PopFront(&work_);
handle->GetThread()->OnTerminate(handle, true);
if (handle->m_params.flags & Thread_AutoRelease)
delete handle;
}
}
void CompatWorker::Worker()
{
// Note: this must be first to ensure an ordering between Worker() and
// Start(). It must also be outside of the loop to ensure the lock is
// held across wakeup and retesting the predicates.
std::unique_lock<std::mutex> lock(mutex_);
if (callbacks_) {
lock.unlock();
callbacks_->OnWorkerStart(this);
lock.lock();
}
typedef std::chrono::system_clock Clock;
typedef std::chrono::time_point<Clock> TimePoint;
auto can_work = [this]() -> bool {
return state_ == Worker_Running && !work_.empty();
};
ke::Maybe<TimePoint> wait;
unsigned int work_in_frame = 0;
for (;;) {
if (state_ == Worker_Stopped)
break;
if (!can_work()) {
// Wait for work or a Stop.
work_cv_.wait(lock);
continue;
}
if (wait.isValid()) {
// Wait until the specified time has passed. If we wake up with a
// timeout, then the wait has elapsed, so reset the holder.
if (work_cv_.wait_until(lock, wait.get()) == std::cv_status::timeout)
wait = ke::Nothing();
continue;
}
assert(state_ == Worker_Running);
assert(!work_.empty());
SWThreadHandle* handle = ke::PopFront(&work_);
RunWorkLocked(&lock, handle);
work_in_frame++;
// If we've reached our max jobs per "frame", signal that the next
// immediate job must be delayed. We retain the old ThreadWorker
// behavior by checking if the queue has more work. Thus, a delay
// only occurs if two jobs would be processed in the same wakeup.
if (work_in_frame >= jobs_per_wakeup_ && wait_between_jobs_ && can_work())
wait = ke::Some(Clock::now() + std::chrono::milliseconds(wait_between_jobs_));
}
assert(lock.owns_lock());
while (!work_.empty()) {
SWThreadHandle* handle = ke::PopFront(&work_);
RunWorkLocked(&lock, handle);
}
}
unsigned int CompatWorker::RunFrame()
{
unsigned int nprocessed = 0;
for (unsigned int i = 1; i <= jobs_per_wakeup_; i++) {
SWThreadHandle* handle;
{
std::lock_guard<std::mutex> lock(mutex_);
if (work_.empty())
break;
handle = ke::PopFront(&work_);
}
RunWork(handle);
nprocessed++;
}
return nprocessed;
}
void CompatWorker::RunWorkLocked(std::unique_lock<std::mutex>* lock, SWThreadHandle* handle)
{
lock->unlock();
RunWork(handle);
lock->lock();
}
void CompatWorker::RunWork(SWThreadHandle* handle)
{
bool autorelease = !!(handle->m_params.flags & Thread_AutoRelease);
handle->m_state = Thread_Running;
handle->GetThread()->RunThread(handle);
handle->m_state = Thread_Done;
handle->GetThread()->OnTerminate(handle, false);
if (autorelease)
delete handle;
}
void CompatWorker::MakeThread(IThread *pThread)
{
ThreadParams params;
params.flags = Thread_AutoRelease;
MakeThread(pThread, &params);
}
IThreadHandle *CompatWorker::MakeThread(IThread *pThread, ThreadFlags flags)
{
ThreadParams params;
params.flags = flags;
return MakeThread(pThread, &params);
}
IThreadHandle *CompatWorker::MakeThread(IThread *pThread, const ThreadParams *params)
{
std::lock_guard<std::mutex> lock(mutex_);
ThreadParams def_params;
if (!params)
params = &def_params;
if (state_ <= Worker_Stopped)
return nullptr;
SWThreadHandle* handle = new SWThreadHandle(this, params, pThread);
work_.push_back(handle);
work_cv_.notify_one();
return handle;
}
void CompatWorker::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min)
{
min = ThreadPrio_Normal;
max = ThreadPrio_Normal;
}
void CompatWorker::SetMaxThreadsPerFrame(unsigned int threads)
{
jobs_per_wakeup_ = threads;
}
void CompatWorker::SetThinkTimePerFrame(unsigned int thinktime)
{
wait_between_jobs_ = thinktime;
}
WorkerState CompatWorker::GetStatus(unsigned int *numThreads)
{
std::lock_guard<std::mutex> lock(mutex_);
// This number is meaningless and the status is racy.
if (numThreads)
*numThreads = jobs_per_wakeup_;
return state_;
}
class CompatThread final : public IThreadHandle
{
public:
CompatThread(IThread* callbacks, const ThreadParams* params);
bool WaitForThread() override;
void DestroyThis() override;
IThreadCreator *Parent() override;
void GetParams(ThreadParams *ptparams) override;
ThreadPriority GetPriority() override;
bool SetPriority(ThreadPriority prio) override;
ThreadState GetState() override;
bool Unpause() override;
private:
void Run();
private:
IThread* callbacks_;
ThreadParams params_;
std::unique_ptr<std::thread> thread_;
std::mutex mutex_;
std::condition_variable check_cv_;
std::atomic<bool> finished_;
};
CompatThread::CompatThread(IThread* callbacks, const ThreadParams* params)
: callbacks_(callbacks),
params_(*params)
{
if (!(params_.flags & Thread_CreateSuspended))
Unpause();
}
bool CompatThread::Unpause()
{
std::unique_lock<std::mutex> lock(mutex_);
if (thread_)
return false;
thread_ = ke::NewThread("SM CompatThread", [this]() -> void {
Run();
});
return true;
}
void CompatThread::Run()
{
// Create an ordering between when the thread runs and when thread_ is assigned.
std::unique_lock<std::mutex> lock(mutex_);
lock.unlock();
callbacks_->RunThread(this);
finished_ = true;
callbacks_->OnTerminate(this, false);
if (params_.flags & Thread_AutoRelease) {
// There should be no handles outstanding, so it's safe to self-destruct.
thread_->detach();
delete this;
return;
}
lock.lock();
callbacks_ = nullptr;
check_cv_.notify_all();
}
bool CompatThread::WaitForThread()
{
std::unique_lock<std::mutex> lock(mutex_);
for (;;) {
// When done, callbacks are unset. If paused, this will deadlock.
if (!callbacks_)
break;
check_cv_.wait(lock);
}
thread_->join();
return true;
}
ThreadState CompatThread::GetState()
{
std::unique_lock<std::mutex> lock(mutex_);
if (!thread_)
return Thread_Paused;
return finished_ ? Thread_Done : Thread_Running;
}
void CompatThread::DestroyThis()
{
delete this;
}
ThreadPriority CompatThread::GetPriority()
{
return ThreadPrio_Normal;
}
bool CompatThread::SetPriority(ThreadPriority prio)
{
return prio == ThreadPrio_Normal;
}
IThreadCreator *CompatThread::Parent()
{
return g_pThreader;
}
void CompatThread::GetParams(ThreadParams *ptparams)
{
*ptparams = params_;
}
class CompatMutex : public IMutex
{
public:
bool TryLock() {
return mutex_.try_lock();
}
void Lock() {
mutex_.lock();
}
void Unlock() {
mutex_.unlock();
}
void DestroyThis() {
delete this;
}
private:
std::mutex mutex_;
};
class CompatThreader final : public IThreader
{
public:
void MakeThread(IThread *pThread) override;
IThreadHandle *MakeThread(IThread *pThread, ThreadFlags flags) override;
IThreadHandle *MakeThread(IThread *pThread, const ThreadParams *params) override;
void GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) override;
IMutex *MakeMutex() override;
void ThreadSleep(unsigned int ms) override;
IEventSignal *MakeEventSignal() override;
IThreadWorker *MakeWorker(IThreadWorkerCallbacks *hooks, bool threaded) override;
void DestroyWorker(IThreadWorker *pWorker) override;
} sCompatThreader;
void CompatThreader::MakeThread(IThread *pThread)
{
ThreadParams params;
params.flags = Thread_AutoRelease;
MakeThread(pThread, &params);
}
IThreadHandle *CompatThreader::MakeThread(IThread *pThread, ThreadFlags flags)
{
ThreadParams params;
params.flags = flags;
return MakeThread(pThread, &params);
}
IThreadHandle *CompatThreader::MakeThread(IThread *pThread, const ThreadParams *params)
{
ThreadParams def_params;
if (!params)
params = &def_params;
return new CompatThread(pThread, params);
}
void CompatThreader::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min)
{
min = ThreadPrio_Normal;
max = ThreadPrio_Normal;
}
IMutex *CompatThreader::MakeMutex()
{
return new CompatMutex();
}
void CompatThreader::ThreadSleep(unsigned int ms)
{
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
class CompatEventSignal final : public IEventSignal
{
public:
void Wait() override {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock);
}
void Signal() override {
std::lock_guard<std::mutex> lock(mutex_);
cv_.notify_all();
}
void DestroyThis() override {
delete this;
}
private:
std::mutex mutex_;
std::condition_variable cv_;
};
IEventSignal *CompatThreader::MakeEventSignal()
{
return new CompatEventSignal();
}
IThreadWorker *CompatThreader::MakeWorker(IThreadWorkerCallbacks *hooks, bool threaded)
{
if (!threaded)
return new BaseWorker(hooks);
return new CompatWorker(hooks);
}
void CompatThreader::DestroyWorker(IThreadWorker *pWorker)
{
delete pWorker;
}
IThreader *g_pThreader = &sCompatThreader;
class RegThreadStuff : public SMGlobalClass
{
public:
void OnSourceModAllInitialized()
{
sharesys->AddInterface(NULL, g_pThreader);
}
} s_RegThreadStuff;