//========= Copyright Valve Corporation, All rights reserved. ============// // // Purpose: A thread pool implementation. You give it CWorkItems, // it processes them asynchronously, and hands them back to you when they've // been completed. // // To declare a queue, provide the implementation of a CWorkItem subtype, // the thread name prefix for threads in the pool, and the number of work // threads you want. // // CNet uses this class to offload encryption to a separate thread, // so that's a good place to start looking for usage examples. // //============================================================================= #ifndef WORKTHREADPOOL_H #define WORKTHREADPOOL_H #ifdef _WIN32 #pragma once #endif #include #include #include "jobtime.h" // forward declaration for CTSQueue which we can't statically allocate as our member // because of alignment issues on Win64 template class CTSQueue; namespace GCSDK { // forward declarations class CWorkThread; class CJobMgr; // these functions return pointers to fixed string in the code section. We need this for VPROF nodes #define DECLARE_WORK_ITEM( classname ) \ virtual const char* GetDispatchCompletedName() const { return #classname"::DispatchCompleted"; } \ virtual const char* GetThreadProcessName() const { return #classname"::ThreadProcess"; } //----------------------------------------------------------------------------- // Purpose: Work item base class. Derive from this for specific work item types. // The derived type ideally should be self-contained with all data it // needs to perform the work. //----------------------------------------------------------------------------- class CWorkItem : public CRefCount { public: CWorkItem() : m_JobID( k_GIDNil ), m_bRunning( false ), m_bResubmit( false ), m_bCanceled( false ), m_ulSequenceNumber( 0 ) { m_jobtimeTimeout.SetLTime( 0 ); m_jobtimeQueued.SetToJobTime(); } CWorkItem( JobID_t jobID ) : m_JobID( jobID ), m_bRunning( false ), m_bResubmit( false ), m_bCanceled( false ), m_ulSequenceNumber( 0 ) { m_jobtimeTimeout.SetLTime( 0 ); m_jobtimeQueued.SetToJobTime(); } CWorkItem( JobID_t jobID, int64 cTimeoutMicroseconds ) : m_JobID( jobID ), m_bRunning( false ), m_bResubmit( false ), m_bCanceled( false ), m_ulSequenceNumber( 0 ) { SetPreExecuteTimeout( cTimeoutMicroseconds ); m_jobtimeQueued.SetToJobTime(); } void SetJobID( JobID_t jobID ) { Assert(jobID != k_GIDNil) ; m_JobID = jobID; } JobID_t GetJobID() const { return m_JobID; } bool HasTimedOut() const { return m_jobtimeTimeout.LTime() != 0 && m_jobtimeTimeout.CServerMicroSecsPassed() > 0; } int64 WaitingTime() const { return m_jobtimeQueued.CServerMicroSecsPassed(); } void SetPreExecuteTimeout( int64 cMicroSeconds ) { m_jobtimeTimeout.SetFromJobTime( cMicroSeconds ); } bool BPreExecuteTimeoutSet( ) const { return m_jobtimeTimeout.LTime() != 0; } void ForceTimeOut() { m_jobtimeTimeout.SetFromJobTime( -1 );} bool BIsRunning() const { return m_bRunning; } // true if running right now bool WasCancelled() const { return m_bCanceled; } void SetCycleCount( CCycleCount& cycleCount ) { m_CycleCount = cycleCount ; } CCycleCount GetCycleCount() { return m_CycleCount; } uint64 GetSequenceNumber() { return m_ulSequenceNumber; } // Work threads can call this to force a work item to be reprocessed (added to the end of the process queue) void SetResubmit( bool bResubmit ) { m_bResubmit = bResubmit; } // these functions return pointers to fixed string in the code section. // We need this for VPROF nodes, you must use the DECLARE_WORK_ITEM macro virtual const char* GetDispatchCompletedName() const = 0; virtual const char* GetThreadProcessName() const = 0; // Return false if your operation failed in some way that you would want to know about // The CWorkThreadPool will count the failures. virtual bool ThreadProcess( CWorkThread *pThread ) = 0; // called by the worker thread virtual bool DispatchCompletedWorkItem( CJobMgr *jobMgr ); // called by main loop after item completed #ifdef DBGFLAG_VALIDATE virtual void Validate( CValidator &validator, const char *pchName ) {} // Validate our internal structures #endif protected: // note: destructor is private. This is a ref-counted object, private destructor ensures callers can't accidentally delete // directly, or declare on stack virtual ~CWorkItem() { } friend class CWorkThread; friend class CWorkThreadPool; uint64 m_ulSequenceNumber; // Sequence number for the work item, used when enforcing output ordering as matching input order CCycleCount m_CycleCount; // A record of how long it took to execute this particular work item ! private: bool m_bResubmit; // true if the item should be resubmitted after last run volatile bool m_bRunning; // true if the work item is running right now bool m_bCanceled; // true if the work was canceled due to timeout CJobTime m_jobtimeTimeout; // time at which this result is no longer valid, so it shouldn't start to be processed CJobTime m_jobtimeQueued; JobID_t m_JobID; }; // forward decl class CWorkThreadPool; //----------------------------------------------------------------------------- // Purpose: Generic work thread implementation, to be specialized if necessary //----------------------------------------------------------------------------- class CWorkThread : public CThread { public: CWorkThread( CWorkThreadPool *pThreadPool ); CWorkThread( CWorkThreadPool *pThreadPool, const char *pszName ); virtual ~CWorkThread() { } virtual int Run(); virtual void Cancel() { } protected: CWorkThreadPool *m_pThreadPool; // parent pool volatile bool m_bExitThread; // set by CWorkThreadPool::StopWorkerThreads and possibly by subclasses of CWorkThread volatile bool m_bFinished; // set by CWorkThread::Run [note: must still check IsThreadRunning, and/or call Join] virtual void OnStart() { } virtual void OnExit() { } #ifdef DBGFLAG_VALIDATE public: virtual void Validate( CValidator &validator, const char *pchName ) { VALIDATE_SCOPE(); }; #endif // DBGFLAG_VALIDATE friend class CWorkThreadPool; }; //----------------------------------------------------------------------------- // callback class to create work threads //----------------------------------------------------------------------------- class IWorkThreadFactory { public: virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool ) = 0; }; //----------------------------------------------------------------------------- // reusable trivial implementation of IWorkThreadFactory //----------------------------------------------------------------------------- template class CWorkThreadFactory : public IWorkThreadFactory { public: virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool ) { return new T( pWorkThreadPool ); } }; //----------------------------------------------------------------------------- // Purpose: interface class for object that the WorkThreadPool can signal when // there are completed work items to process //----------------------------------------------------------------------------- class IWorkThreadPoolSignal { public: virtual void Signal() = 0; }; //----------------------------------------------------------------------------- // Purpose: pool of work threads. //----------------------------------------------------------------------------- class CWorkThreadPool { friend class CWorkThread; public: static void SetWorkItemCompletedSignal( IWorkThreadPoolSignal *pObject ) { sm_pWorkItemsCompletedSignal = pObject; } CWorkThreadPool( const char *pszThreadNamePfx ); // eventually it might be nice to be able to resize these pools via console command // in that case, we'd want a constructor like this, and a PoolSize accessor/mutator pair // it makes this class much more complicated, however (growing the pool is easy, shrinking it // is less easy) so we'll punt for now. /* CWorkThreadPool( const char *pszName = "unnamed thread" ) : CWorkThreadPool( pszName, -1 ); */ virtual ~CWorkThreadPool(); // Setting this will ensure that items of the same priority complete and get dispatched in the same order // they are added to the threadpool. This has a small additional locking overhead and can increase latency // as items that are actually completed out-of-order have to queue waiting on earlier items. void SetEnsureOutputOrdering( bool bEnsureOutputOrdering ) { m_bEnsureOutputOrdering = bEnsureOutputOrdering; } void AllowTimeouts( bool bMayHaveJobTimeouts ) { m_bMayHaveJobTimeouts = bMayHaveJobTimeouts; } int AddWorkThread( CWorkThread *pThread ); void StartWorkThreads(); // gentlemen, start your engines void StopWorkThreads(); // stop work threads bool HasWorkItemsToProcess() const; // sets it to use dynamic worker thread construction // if pWorkThreadControl is NULL, just creates a standard CWorkThread object void SetWorkThreadAutoConstruct( int cMaxThreads, IWorkThreadFactory *pWorkThreadConstructor ); bool AddWorkItem( CWorkItem *pWorkItem ); // add a work item to the queue to process CWorkItem *GetNextCompletedWorkItem( ); // get next completed work item and it's priority if needed const char *GetThreadNamePrefix() const { return m_szThreadNamePfx; } void SetNeverSetEventOnAdd( bool bNeverSet ); bool BNeverSetEventOnAdd() { return m_bNeverSetOnAdd; } // get count of completed work items // can't be inline because of m_TSQueueCompleted type int GetCompletedWorkItemCount() const; // get count of work items to process // can't be inline because of m_TSQueueToProcess type int GetWorkItemToProcessCount() const; uint64 GetLastUsedSequenceNumber( ) const { return m_ulLastUsedSequenceNumber; } uint64 GetLastCompletedSequenceNumber( ) const { return m_ulLastCompletedSequenceNumber; } uint64 GetLastDispatchedSequenceNumber( ) const { return m_ulLastDispatchedSequenceNumber; } #if 0 uint64 GetAveExecutionTime() const { return m_StatExecutionTime.GetUlAvg(); } uint64 GetAveWaitTime() const { return m_StatWaitTime.GetUlAvg(); } uint64 GetCurrentBacklogTime() const; #endif int CountCompletedSuccess() const { return m_cSuccesses; } int CountRetries() const { return m_cRetries; } int CountCompletedFailed() const { return m_cFailures; } bool BDispatchCompletedWorkItems( const CLimitTimer &limitTimer, CJobMgr *pJobMgr ); bool BExiting() const { return m_bExiting; } int GetWorkerCount() const { return m_WorkThreads.Count(); } uint GetActiveThreadCount() const { return m_cActiveThreads; } // make sure you lock before using this const CWorkThread *GetWorkThread( int iIndex ) const { Assert( iIndex >= 0 && iIndex < m_WorkThreads.Count() ); return m_WorkThreads[iIndex]; } protected: // STATICS static IWorkThreadPoolSignal *sm_pWorkItemsCompletedSignal; // MEMBERS CWorkItem *GetNextWorkItemToProcess( ); void StartWorkThread( CWorkThread *pWorkThread, int iName ); // meaningful thread name prefix char m_szThreadNamePfx[32]; // have we actually initialized the threadpool? bool m_bThreadsInitialized; // Incoming queue: queue of all work items to process // must be dynamically allocated for alignment requirements on Win64 CTSQueue< CWorkItem *, false > *m_pTSQueueToProcess; // Outgoing queues: queue of all completed work items // must be dynamically allocated for alignment requirements on Win64 CTSQueue< CWorkItem *, false > *m_pTSQueueCompleted; // Vectors of completed, but out of order and waiting work items, only used when bEnsureOutputOrdering == true CThreadMutex m_MutexOnItemCompletedOrdered; CUtlVector< CWorkItem * > m_vecCompletedAndWaiting; // Should we emit work items in the same order they are received (on a per priority basis) bool m_bEnsureOutputOrdering; // Sequence numbers uint64 m_ulLastUsedSequenceNumber; uint64 m_ulLastCompletedSequenceNumber; uint64 m_ulLastDispatchedSequenceNumber; bool m_bMayHaveJobTimeouts; CUtlVector< CWorkThread * > m_WorkThreads; CThreadMutex m_WorkThreadMutex; CInterlockedUInt m_cThreadsRunning; // how many threads are running volatile bool m_bExiting; // are we exiting CThreadEvent m_EventNewWorkItem; // event set when a new work item is available to process CInterlockedInt m_cActiveThreads; volatile bool m_bNeverSetOnAdd; bool m_bAutoCreateThreads; int m_cMaxThreads; IWorkThreadFactory *m_pWorkThreadConstructor; // override this method if you want to do any special handling of completed work items. Default implementation puts // work items in our completed item queue. virtual void OnWorkItemCompleted( CWorkItem *pWorkItem ); bool BTryDeleteExitedWorkerThreads(); int m_cSuccesses; int m_cFailures; int m_cRetries; #if 0 CStat m_StatExecutionTime; CStat m_StatWaitTime; #endif CLimitTimer m_LimitTimerCreateNewThreads; #ifdef DBGFLAG_VALIDATE public: void Validate( CValidator &validator, const char *pchName ); #endif }; } // namespace GCSDK #endif // WORKTHREAD_H