/** * vim: set ts=4 sw=4 tw=99 noet : * ============================================================================= * SourceMod * Copyright (C) 2004-2009 AlliedModders LLC. All rights reserved. * ============================================================================= * * This program is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License, version 3.0, as published by the * Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along with * this program. If not, see . * * As a special exception, AlliedModders LLC gives you permission to link the * code of this program (as well as its derivative works) to "Half-Life 2," the * "Source Engine," the "SourcePawn JIT," and any Game MODs that run on software * by the Valve Corporation. You must obey the GNU General Public License in * all respects for all other code used. Additionally, AlliedModders LLC grants * this exception to all derivative works. AlliedModders LLC defines further * exceptions, found in LICENSE.txt (as of this writing, version JULY-31-2007), * or . * * Version: $Id$ */ #include #include #include #include #include #include #include #include #include #include #include "BaseWorker.h" #include "ThreadSupport.h" #include "common_logic.h" static constexpr unsigned int DEFAULT_THINK_TIME_MS = 20; class CompatWorker final : public IThreadWorker { public: explicit CompatWorker(IThreadWorkerCallbacks* callbacks); ~CompatWorker(); void MakeThread(IThread *pThread) override; IThreadHandle *MakeThread(IThread *pThread, ThreadFlags flags) override; IThreadHandle *MakeThread(IThread *pThread, const ThreadParams *params) override; void GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) override; unsigned int RunFrame() override; bool Pause() override; bool Unpause() override; bool Start() override; bool Stop(bool flush) override; WorkerState GetStatus(unsigned int *numThreads) override; void SetMaxThreadsPerFrame(unsigned int threads) override; void SetThinkTimePerFrame(unsigned int thinktime) override; private: void Flush(); void Worker(); void RunWork(SWThreadHandle* handle); void RunWorkLocked(std::unique_lock* lock, SWThreadHandle* handle); private: IThreadWorkerCallbacks* callbacks_; WorkerState state_; std::mutex mutex_; std::condition_variable work_cv_; std::deque work_; std::unique_ptr thread_; std::atomic jobs_per_wakeup_; std::atomic wait_between_jobs_; }; CompatWorker::CompatWorker(IThreadWorkerCallbacks* callbacks) : callbacks_(callbacks), state_(Worker_Stopped), jobs_per_wakeup_(SM_DEFAULT_THREADS_PER_FRAME), wait_between_jobs_(DEFAULT_THINK_TIME_MS) { } CompatWorker::~CompatWorker() { Stop(false /* ignored */); Flush(); } bool CompatWorker::Start() { std::lock_guard lock(mutex_); if (state_ != Worker_Stopped) return false; thread_ = ke::NewThread("SM CompatWorker Thread", [this]() -> void { Worker(); }); state_ = Worker_Running; return true; } bool CompatWorker::Stop(bool) { { std::lock_guard lock(mutex_); if (state_ <= Worker_Stopped) return false; state_ = Worker_Stopped; work_cv_.notify_all(); } thread_->join(); thread_ = nullptr; Flush(); return true; } bool CompatWorker::Pause() { std::lock_guard lock(mutex_); if (state_ != Worker_Running) return false; state_ = Worker_Paused; work_cv_.notify_all(); return true; } bool CompatWorker::Unpause() { std::lock_guard lock(mutex_); if (state_ != Worker_Paused) return false; state_ = Worker_Running; work_cv_.notify_all(); return true; } void CompatWorker::Flush() { while (!work_.empty()) { auto handle = ke::PopFront(&work_); handle->GetThread()->OnTerminate(handle, true); if (handle->m_params.flags & Thread_AutoRelease) delete handle; } } void CompatWorker::Worker() { // Note: this must be first to ensure an ordering between Worker() and // Start(). It must also be outside of the loop to ensure the lock is // held across wakeup and retesting the predicates. std::unique_lock lock(mutex_); if (callbacks_) { lock.unlock(); callbacks_->OnWorkerStart(this); lock.lock(); } typedef std::chrono::system_clock Clock; typedef std::chrono::time_point TimePoint; auto can_work = [this]() -> bool { return state_ == Worker_Running && !work_.empty(); }; ke::Maybe wait; unsigned int work_in_frame = 0; for (;;) { if (state_ == Worker_Stopped) break; if (!can_work()) { // Wait for work or a Stop. work_cv_.wait(lock); continue; } if (wait.isValid()) { // Wait until the specified time has passed. If we wake up with a // timeout, then the wait has elapsed, so reset the holder. if (work_cv_.wait_until(lock, wait.get()) == std::cv_status::timeout) wait = ke::Nothing(); continue; } assert(state_ == Worker_Running); assert(!work_.empty()); SWThreadHandle* handle = ke::PopFront(&work_); RunWorkLocked(&lock, handle); work_in_frame++; // If we've reached our max jobs per "frame", signal that the next // immediate job must be delayed. We retain the old ThreadWorker // behavior by checking if the queue has more work. Thus, a delay // only occurs if two jobs would be processed in the same wakeup. if (work_in_frame >= jobs_per_wakeup_ && wait_between_jobs_ && can_work()) wait = ke::Some(Clock::now() + std::chrono::milliseconds(wait_between_jobs_)); } assert(lock.owns_lock()); while (!work_.empty()) { SWThreadHandle* handle = ke::PopFront(&work_); RunWorkLocked(&lock, handle); } } unsigned int CompatWorker::RunFrame() { unsigned int nprocessed = 0; for (unsigned int i = 1; i <= jobs_per_wakeup_; i++) { SWThreadHandle* handle; { std::lock_guard lock(mutex_); if (work_.empty()) break; handle = ke::PopFront(&work_); } RunWork(handle); nprocessed++; } return nprocessed; } void CompatWorker::RunWorkLocked(std::unique_lock* lock, SWThreadHandle* handle) { lock->unlock(); RunWork(handle); lock->lock(); } void CompatWorker::RunWork(SWThreadHandle* handle) { bool autorelease = !!(handle->m_params.flags & Thread_AutoRelease); handle->m_state = Thread_Running; handle->GetThread()->RunThread(handle); handle->m_state = Thread_Done; handle->GetThread()->OnTerminate(handle, false); if (autorelease) delete handle; } void CompatWorker::MakeThread(IThread *pThread) { ThreadParams params; params.flags = Thread_AutoRelease; MakeThread(pThread, ¶ms); } IThreadHandle *CompatWorker::MakeThread(IThread *pThread, ThreadFlags flags) { ThreadParams params; params.flags = flags; return MakeThread(pThread, ¶ms); } IThreadHandle *CompatWorker::MakeThread(IThread *pThread, const ThreadParams *params) { std::lock_guard lock(mutex_); ThreadParams def_params; if (!params) params = &def_params; if (state_ <= Worker_Stopped) return nullptr; SWThreadHandle* handle = new SWThreadHandle(this, params, pThread); work_.push_back(handle); work_cv_.notify_one(); return handle; } void CompatWorker::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) { min = ThreadPrio_Normal; max = ThreadPrio_Normal; } void CompatWorker::SetMaxThreadsPerFrame(unsigned int threads) { jobs_per_wakeup_ = threads; } void CompatWorker::SetThinkTimePerFrame(unsigned int thinktime) { wait_between_jobs_ = thinktime; } WorkerState CompatWorker::GetStatus(unsigned int *numThreads) { std::lock_guard lock(mutex_); // This number is meaningless and the status is racy. if (numThreads) *numThreads = jobs_per_wakeup_; return state_; } class CompatThread final : public IThreadHandle { public: CompatThread(IThread* callbacks, const ThreadParams* params); bool WaitForThread() override; void DestroyThis() override; IThreadCreator *Parent() override; void GetParams(ThreadParams *ptparams) override; ThreadPriority GetPriority() override; bool SetPriority(ThreadPriority prio) override; ThreadState GetState() override; bool Unpause() override; private: void Run(); private: IThread* callbacks_; ThreadParams params_; std::unique_ptr thread_; std::mutex mutex_; std::condition_variable check_cv_; std::atomic finished_; }; CompatThread::CompatThread(IThread* callbacks, const ThreadParams* params) : callbacks_(callbacks), params_(*params) { if (!(params_.flags & Thread_CreateSuspended)) Unpause(); } bool CompatThread::Unpause() { std::unique_lock lock(mutex_); if (thread_) return false; thread_ = ke::NewThread("SM CompatThread", [this]() -> void { Run(); }); return true; } void CompatThread::Run() { // Create an ordering between when the thread runs and when thread_ is assigned. std::unique_lock lock(mutex_); lock.unlock(); callbacks_->RunThread(this); finished_ = true; callbacks_->OnTerminate(this, false); if (params_.flags & Thread_AutoRelease) { // There should be no handles outstanding, so it's safe to self-destruct. thread_->detach(); delete this; return; } lock.lock(); callbacks_ = nullptr; check_cv_.notify_all(); } bool CompatThread::WaitForThread() { std::unique_lock lock(mutex_); for (;;) { // When done, callbacks are unset. If paused, this will deadlock. if (!callbacks_) break; check_cv_.wait(lock); } thread_->join(); return true; } ThreadState CompatThread::GetState() { std::unique_lock lock(mutex_); if (!thread_) return Thread_Paused; return finished_ ? Thread_Done : Thread_Running; } void CompatThread::DestroyThis() { delete this; } ThreadPriority CompatThread::GetPriority() { return ThreadPrio_Normal; } bool CompatThread::SetPriority(ThreadPriority prio) { return prio == ThreadPrio_Normal; } IThreadCreator *CompatThread::Parent() { return g_pThreader; } void CompatThread::GetParams(ThreadParams *ptparams) { *ptparams = params_; } class CompatMutex : public IMutex { public: bool TryLock() { return mutex_.try_lock(); } void Lock() { mutex_.lock(); } void Unlock() { mutex_.unlock(); } void DestroyThis() { delete this; } private: std::mutex mutex_; }; class CompatThreader final : public IThreader { public: void MakeThread(IThread *pThread) override; IThreadHandle *MakeThread(IThread *pThread, ThreadFlags flags) override; IThreadHandle *MakeThread(IThread *pThread, const ThreadParams *params) override; void GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) override; IMutex *MakeMutex() override; void ThreadSleep(unsigned int ms) override; IEventSignal *MakeEventSignal() override; IThreadWorker *MakeWorker(IThreadWorkerCallbacks *hooks, bool threaded) override; void DestroyWorker(IThreadWorker *pWorker) override; } sCompatThreader; void CompatThreader::MakeThread(IThread *pThread) { ThreadParams params; params.flags = Thread_AutoRelease; MakeThread(pThread, ¶ms); } IThreadHandle *CompatThreader::MakeThread(IThread *pThread, ThreadFlags flags) { ThreadParams params; params.flags = flags; return MakeThread(pThread, ¶ms); } IThreadHandle *CompatThreader::MakeThread(IThread *pThread, const ThreadParams *params) { ThreadParams def_params; if (!params) params = &def_params; return new CompatThread(pThread, params); } void CompatThreader::GetPriorityBounds(ThreadPriority &max, ThreadPriority &min) { min = ThreadPrio_Normal; max = ThreadPrio_Normal; } IMutex *CompatThreader::MakeMutex() { return new CompatMutex(); } void CompatThreader::ThreadSleep(unsigned int ms) { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); } class CompatEventSignal final : public IEventSignal { public: void Wait() override { std::unique_lock lock(mutex_); cv_.wait(lock); } void Signal() override { std::lock_guard lock(mutex_); cv_.notify_all(); } void DestroyThis() override { delete this; } private: std::mutex mutex_; std::condition_variable cv_; }; IEventSignal *CompatThreader::MakeEventSignal() { return new CompatEventSignal(); } IThreadWorker *CompatThreader::MakeWorker(IThreadWorkerCallbacks *hooks, bool threaded) { if (!threaded) return new BaseWorker(hooks); return new CompatWorker(hooks); } void CompatThreader::DestroyWorker(IThreadWorker *pWorker) { delete pWorker; } IThreader *g_pThreader = &sCompatThreader; class RegThreadStuff : public SMGlobalClass { public: void OnSourceModAllInitialized() { sharesys->AddInterface(NULL, g_pThreader); } } s_RegThreadStuff;