//========= Copyright Valve Corporation, All rights reserved. ============// // // Purpose: // // $NoKeywords: $ //============================================================================= #include "stdafx.h" #include "tslist.h" #include #include #include "tier0/memdbgon.h" namespace GCSDK { IWorkThreadPoolSignal *CWorkThreadPool::sm_pWorkItemsCompletedSignal = NULL; //----------------------------------------------------------------------------- // Purpose: CWorkThread constructors //----------------------------------------------------------------------------- CWorkThread::CWorkThread( CWorkThreadPool *pThreadPool ) : m_pThreadPool( pThreadPool ), m_bExitThread( false ), m_bFinished( false ) { } CWorkThread::CWorkThread( CWorkThreadPool *pThreadPool, const char *pszName ) : m_pThreadPool( pThreadPool ), m_bExitThread( false ), m_bFinished( false ) { SetName( pszName ); } //----------------------------------------------------------------------------- // Purpose: Tell work thread pool not to set event on every item added (SetEvent is very expensive) //----------------------------------------------------------------------------- void CWorkThreadPool::SetNeverSetEventOnAdd( bool bNeverSet ) { bool bWasSet = m_bNeverSetOnAdd; m_bNeverSetOnAdd = bNeverSet; // In case of disabling set right away to make sure if we have pending work we execute it now with no latency if ( bWasSet && !m_bNeverSetOnAdd ) m_EventNewWorkItem.Set(); } //----------------------------------------------------------------------------- // Purpose: performs the work loop for the thread, waits for work, // notifies the owner (the pool) as it completes work and before it exits //----------------------------------------------------------------------------- int CWorkThread::Run() { // manage our thread pool's statistics ++m_pThreadPool->m_cThreadsRunning; #ifdef _SERVER g_CompletionPortManager.AssociateCallingThreadWithIOCP(); #endif OnStart(); #if 0 // need to port over new vprof code #if defined( VPROF_ENABLED ) CVProfile *pProfile = GetVProfProfileForCurrentThread(); #endif #endif CWorkThreadPool *pPool = m_pThreadPool; int nIterations = 0; const int nMaxFastIterations = 4; while ( !m_bExitThread ) { #if 0 // game vprof doesn't yet support TLS'd vprof instances, until new vprof code is ported #if defined( VPROF_ENABLED ) if ( pProfile ) pProfile->MarkFrame( GetName() ); #endif #endif pPool->m_cActiveThreads++; nIterations = 0; while ( (pPool->BNeverSetEventOnAdd() && nIterations < nMaxFastIterations) || nIterations == 0 ) { // process any items which have arrived CWorkItem *pWorkItem = pPool->GetNextWorkItemToProcess( ); while ( pWorkItem ) { #if 0 pPool->m_StatWaitTime.Update( pWorkItem->WaitingTime() ); #endif if ( pWorkItem->HasTimedOut() ) { pWorkItem->m_bCanceled = true; } else { // call the work item to do its work pWorkItem->m_bCanceled = false; CFastTimer fastTimer; fastTimer.Start(); pWorkItem->m_bRunning = true; bool bSuccess = pWorkItem->ThreadProcess( this ); pWorkItem->m_bRunning = false; fastTimer.End(); CCycleCount cycleCount = fastTimer.GetDuration(); pWorkItem->SetCycleCount(cycleCount); #if 0 pPool->m_StatExecutionTime.Update( cycleCount.GetUlMicroseconds() ); #endif if ( bSuccess ) pPool->m_cSuccesses ++; else pWorkItem->m_bResubmit ? pPool->m_cRetries++ : pPool->m_cFailures++; } // do we need to resubmit this item? if ( pWorkItem->m_bResubmit ) { pWorkItem->m_bResubmit = false; pWorkItem->m_bCanceled = false; // put it at the tail of the incoming queue pPool->AddWorkItem( pWorkItem ); pWorkItem->Release(); // dec since AddWorkItem added 1 more again } else { // put it in the outgoing queue pPool->OnWorkItemCompleted( pWorkItem ); } // If we are flagged as exiting don't try to get more work, we need to exit right away and orphan the work // to avoid blocking shutdown. if ( !m_bExitThread ) { // get the next work item (if any) pWorkItem = pPool->GetNextWorkItemToProcess( ); } else { pWorkItem = NULL; } #if 0 // game vprof doesn't yet support TLS'd vprof instances, until new vprof code is ported #if defined( VPROF_ENABLED ) if ( pProfile && pWorkItem ) pProfile->MarkFrame( GetName() ); #endif #endif } if ( m_bExitThread ) break; ++nIterations; if ( pPool->BNeverSetEventOnAdd() && nIterations < nMaxFastIterations ) { VPROF_BUDGET( "CWorkThread -- Sleep", VPROF_BUDGETGROUP_SLEEPING ); ThreadSleep( 2 ); } } pPool->m_cActiveThreads--; // wait for a new work item to arrive in the queue, check the counts first just to be sure { VPROF_BUDGET( "CWorkThread -- Sleep", VPROF_BUDGETGROUP_SLEEPING ); #ifdef _SERVER if ( pPool->BNeverSetEventOnAdd() ) pPool->m_EventNewWorkItem.Wait( 15 ); else pPool->m_EventNewWorkItem.Wait( 50 ); #else pPool->m_EventNewWorkItem.Wait( 50 ); #endif } } // Since we are exiting, we must have been signaled to shutdown, and we should signal any remaining threads // since each signal wakes only one thread. pPool->m_EventNewWorkItem.Set(); m_bFinished = true; // updates stats --m_pThreadPool->m_cThreadsRunning; return EXIT_SUCCESS; } //----------------------------------------------------------------------------- // Purpose: Construct a new CWorkThreadPool object //----------------------------------------------------------------------------- CWorkThreadPool::CWorkThreadPool( const char *pszThreadName ) : #if 0 m_StatWaitTime( 100 ), m_StatExecutionTime( 100 ), #endif m_bThreadsInitialized( false ), m_cThreadsRunning( 0 ), m_cActiveThreads( 0 ), m_bMayHaveJobTimeouts( false ), m_bExiting( false ), m_bAutoCreateThreads( false ), m_cMaxThreads( 0 ), m_cFailures( 0 ), m_cSuccesses( 0 ), m_pWorkThreadConstructor( NULL ), m_ulLastCompletedSequenceNumber( 0 ), m_ulLastUsedSequenceNumber( 0 ), m_ulLastDispatchedSequenceNumber( 0 ), m_bEnsureOutputOrdering( false ), m_bNeverSetOnAdd( false ) { Assert( pszThreadName != NULL ); Q_strncpy( m_szThreadNamePfx, pszThreadName, sizeof( m_szThreadNamePfx ) ); m_LimitTimerCreateNewThreads.SetLimit( 1 ); m_pTSQueueToProcess = new CTSQueue< CWorkItem* >; m_pTSQueueCompleted = new CTSQueue< CWorkItem* >; } //----------------------------------------------------------------------------- // Purpose: destructor; does assertion checks to make sure we weere shut down cleanly // cleans up even if we weren't cleanly stopped //----------------------------------------------------------------------------- CWorkThreadPool::~CWorkThreadPool() { // If you hit this you probably didn't call StopWorkThreads() first AssertMsg1( ( !m_bThreadsInitialized || m_bExiting ) && 0 == m_cThreadsRunning, "CWorkThreadPool::~CWorkThreadPool(): Thread pool %s shutdown incorrectly.\n", m_szThreadNamePfx ); if ( m_WorkThreads.Count() ) { StopWorkThreads(); Assert( 0 == m_WorkThreads.Count() ); } Assert( 0 == m_cThreadsRunning ); // WARNING: We need to release any items left in the queues CWorkItem *pWorkItem = NULL; if ( m_pTSQueueCompleted->Count() > 0 ) { EmitWarning( SPEW_THREADS, 2, "CWorkThreadPool::~CWorkThreadPool: work complete queue not empty, %d items discarded.\n", m_pTSQueueCompleted->Count() ); pWorkItem = NULL; while ( m_pTSQueueCompleted->PopItem( &pWorkItem ) ) { while( pWorkItem->Release() ) { /* nothing */ } } } if ( m_pTSQueueToProcess->Count() > 0 ) { EmitWarning( SPEW_THREADS, 2, "CWorkThreadPool::~CWorkThreadPool: work processing queue not empty: %d items discarded.\n", m_pTSQueueToProcess->Count() ); while ( m_pTSQueueToProcess->PopItem( &pWorkItem ) ) { while( pWorkItem->Release() ) { /* nothing */ } } } delete m_pTSQueueToProcess; delete m_pTSQueueCompleted; } #if 0 //----------------------------------------------------------------------------- // Purpose: estimate the current backlog time using previous execution time, // the number of outstanding items, and the number of running threads //----------------------------------------------------------------------------- uint64 CWorkThreadPool::GetCurrentBacklogTime() const { if ( m_WorkThreads.Count() == 0 ) return 0; return ( m_pTSQueueToProcess->Count() * m_StatExecutionTime.GetUlAvg() ) / m_WorkThreads.Count(); } #endif int CWorkThreadPool::AddWorkThread( CWorkThread *pThread ) { AUTO_LOCK( m_WorkThreadMutex ); Assert( pThread ); return m_WorkThreads.AddToTail( pThread ); } void CWorkThreadPool::StartWorkThread( CWorkThread *pWorkThread, int iName ) { char rgchThreadName[32]; Q_snprintf( rgchThreadName, sizeof( rgchThreadName ), "%s:%d", m_szThreadNamePfx, iName ); pWorkThread->SetName( rgchThreadName ); if ( !pWorkThread->Start() ) EmitError( SPEW_THREADS, "CWorkThreadPool::StartWorkThread: Thread creation failed.\n" ); } void CWorkThreadPool::StartWorkThreads() { m_bThreadsInitialized = true; if ( 0 == m_WorkThreads.Count() ) { EmitWarning( SPEW_THREADS, 2, "CWorkThreadPool::StartWorkThreads: called with no threads in the pool, this is probably a bug.\n" ); return; } m_bExiting = false; m_cThreadsRunning = 0; AUTO_LOCK( m_WorkThreadMutex ); FOR_EACH_VEC( m_WorkThreads, i ) { StartWorkThread( m_WorkThreads[i], i ); } // XXX why? while ( m_cThreadsRunning == (uint) 0 ) { ThreadSleep( 1 ); } } //----------------------------------------------------------------------------- // Purpose: stops whatever work threads we're running // this must be called before the thread pool object is destroyed //----------------------------------------------------------------------------- void CWorkThreadPool::StopWorkThreads() { // indicate that we're shutting down; // don't accept more work in this thread m_bExiting = true; AUTO_LOCK( m_WorkThreadMutex ); FOR_EACH_VEC( m_WorkThreads, i ) { m_WorkThreads[i]->m_bExitThread = true; m_WorkThreads[i]->Cancel(); } // loop until all threads are dead while ( true ) { // This thread already holds the mutex; recursive try-lock should always succeed DbgVerify( BTryDeleteExitedWorkerThreads() ); if ( m_WorkThreads.Count() == 0 ) break; // Keep waking up threads until they're all dead. m_EventNewWorkItem.Set(); #ifdef _PS3 // call to abort any running call to gethostbyname(). // this is called over all the remaining work threads, while // waiting for the rest of the work threads to finish so that they won't // spuriously block on new calls to gethostbyname() as the // sys_net_abort_resolver call only stops the next call to the // network API, not any future calls. FOR_EACH_VEC( m_WorkThreads, iPS3 ) { // PS3 hack to abort gethostbyname() calls that may be blocking... sys_net_abort_resolver( m_WorkThreads[ iPS3 ]->GetThreadID(), SYS_NET_ABORT_STRICT_CHECK ); } #endif const uint k_uJoinTimeoutMillisec = 10000; // 10 seconds seems pretty arbitrary. CWorkThread *pWorkThread = m_WorkThreads[0]; bool bJoined = pWorkThread->Join( k_uJoinTimeoutMillisec ); if ( !bJoined ) { // Print thread id as a pointer for cross-platform compatibility EmitWarning( SPEW_THREADS, 2, "Thread \"%s\" (ID %p) failed to shut down", pWorkThread->GetName(), (void*)pWorkThread->GetThreadID() ); } else { // Succesful join means that the thread has terminated. if ( !pWorkThread->m_bFinished ) { // This would be a logic error in the thread proc if it ever tripped. AssertMsg( false, "pWorkThread->m_bFinished is false but thread is not running" ); // Recover by flagging the thread as potentially eligable for deletion, since it's dead. pWorkThread->m_bFinished = true; } } } Assert( m_WorkThreads.Count() == 0 && m_cThreadsRunning == (uint32) 0 ); } //----------------------------------------------------------------------------- // Purpose: sees if we have a non-zero number of work threads, // or a non-zero number of active threads //----------------------------------------------------------------------------- bool CWorkThreadPool::HasWorkItemsToProcess() const { return ( m_pTSQueueToProcess->Count() > 0 ) || ( m_cActiveThreads > 0 ); } //----------------------------------------------------------------------------- // Purpose: sets dynamic thread construction //----------------------------------------------------------------------------- void CWorkThreadPool::SetWorkThreadAutoConstruct( int cMaxThreads, IWorkThreadFactory *pWorkThreadConstructor ) { AUTO_LOCK( m_WorkThreadMutex ); m_bThreadsInitialized = true; m_bAutoCreateThreads = true; m_cMaxThreads = MAX( 1, cMaxThreads ); m_pWorkThreadConstructor = pWorkThreadConstructor; // If we have too many threads now, mark some to exit next time they loop. for ( int i = m_cMaxThreads; i < m_WorkThreads.Count(); i++ ) { m_WorkThreads[i]->m_bExitThread = true; } } //----------------------------------------------------------------------------- // Purpose: Adds a work item // Output: true if successful, // false if a low priority work item is not added due to a busy system // false if this work pool is shutting down and work isn't being accepted // NOTE: Adding normal priority items should always succeed //----------------------------------------------------------------------------- bool CWorkThreadPool::AddWorkItem( CWorkItem *pWorkItem ) { Assert( !m_bExiting ); if ( m_bExiting ) return false; if ( m_bEnsureOutputOrdering ) { AssertMsg( pWorkItem->m_bResubmit == false, "CWorkThreadPool can't support item auto resubmission when ensuring output ordering" ); } // if we're in auto-create mode, make sure we have enough threads running if ( m_bAutoCreateThreads && m_WorkThreads.Count() < m_cMaxThreads ) { int cPendingItems = m_pTSQueueToProcess->Count(); // we shouldn't get more than 12 items queued per already existing thread, otherwise we // want to create a new thread to help us keep up. if ( m_WorkThreads.Count() < 1 || m_WorkThreads.Count() * 12 < ( cPendingItems + 1 ) ) { if ( m_WorkThreads.Count() >= 2 && !m_LimitTimerCreateNewThreads.BLimitReached() ) { // Don't create more yet, we don't want to create them too fast } else { // create another thread CWorkThread *pWorkThread = NULL; if ( m_pWorkThreadConstructor ) { pWorkThread = m_pWorkThreadConstructor->CreateWorkerThread( this ); } else { pWorkThread = new CWorkThread( this ); } if( pWorkThread != NULL ) { int iName = AddWorkThread( pWorkThread ); StartWorkThread( pWorkThread, iName ); } m_LimitTimerCreateNewThreads.SetLimit( 250*k_nThousand ); } } } // // Do we actually have any threads ? If creating threads can fail, then maybe we don't ! // In that case, this WorkItem is not going to run ! // if ( m_WorkThreads.Count() == 0 ) { Assert(false); return false ; } // WARNING: We need to call pWorkItem AddRef() and Release() at all entry/exit points for the thread pool system. pWorkItem->AddRef(); pWorkItem->m_ulSequenceNumber = (++m_ulLastUsedSequenceNumber); m_pTSQueueToProcess->PushItem( pWorkItem ); if ( !BNeverSetEventOnAdd() && m_cActiveThreads == 0 ) { VPROF_BUDGET( "SetEvent()", VPROF_BUDGETGROUP_THREADINGMAIN ); m_EventNewWorkItem.Set(); } return true; } CWorkItem *CWorkThreadPool::GetNextCompletedWorkItem( ) { CWorkItem *pWorkItem = NULL; // Use a while loop just in case ref counts get screwed up and an item gets deleted when we release our reference to it while ( m_pTSQueueCompleted->PopItem( &pWorkItem ) ) { // WARNING: We need to call workitem AddRef() and Release() at all entry/exit points for the thread pool system. // Release() returns the current refcount of the object (after decrementing it by one) and should be non-zero unless the // the caller has released it already. if ( pWorkItem != NULL && pWorkItem->Release() > 0 ) { return pWorkItem; } } return NULL; } //----------------------------------------------------------------------------- // Purpose: gets the next work item to process. This non-blocking function // returns NULL immediately if there's nothing left in the queue. // otherwise, a pointer to the next CWorkItem. //----------------------------------------------------------------------------- CWorkItem *CWorkThreadPool::GetNextWorkItemToProcess( ) { CWorkItem *pWorkItem = NULL; if ( m_pTSQueueToProcess->Count() && m_pTSQueueToProcess->PopItem( &pWorkItem ) ) { return pWorkItem; } return NULL; } bool CWorkThreadPool::BDispatchCompletedWorkItems( const CLimitTimer &limitTimer, CJobMgr *pJobMgr ) { BTryDeleteExitedWorkerThreads(); CWorkItem *pWorkItem = GetNextCompletedWorkItem( ); while ( pWorkItem != NULL ) { uint64 ulSequenceNumber = pWorkItem->m_ulSequenceNumber; // NOTE: despite its name, this YIELDS - the target job // is resumed, and we resume here. if ( !pWorkItem->DispatchCompletedWorkItem( pJobMgr ) ) { EmitWarning( SPEW_THREADS, 2, "Work Item for Work Pool %s completed but job no longer existed to notify\n", m_szThreadNamePfx == NULL ? "UNKNOWN" :m_szThreadNamePfx ); AssertMsg1( m_bMayHaveJobTimeouts, "Work Item for Work Pool %s completed but job no longer existed to notify", m_szThreadNamePfx == NULL ? "UNKNOWN" :m_szThreadNamePfx ); } // pWorkItem was released by DispatchCompletedWorkItem m_ulLastDispatchedSequenceNumber = ulSequenceNumber; if ( limitTimer.BLimitReached() ) break; pWorkItem = GetNextCompletedWorkItem( ); } return ( GetCompletedWorkItemCount() > 0 ); } //----------------------------------------------------------------------------- // Purpose: delete any thread objects that have exited // we'll make sure the thread has actually ended; // if they haven't, they'll remain in the threads to delete list //----------------------------------------------------------------------------- bool CWorkThreadPool::BTryDeleteExitedWorkerThreads() { if ( m_WorkThreadMutex.TryLock() ) { if ( m_cThreadsRunning < (uint) m_WorkThreads.Count() ) { FOR_EACH_VEC_BACK( m_WorkThreads, i ) { CWorkThread *pWorkThread = m_WorkThreads[i]; if ( pWorkThread->m_bFinished && !pWorkThread->IsThreadRunning() ) { m_WorkThreads.FastRemove( i ); delete pWorkThread; } } } m_WorkThreadMutex.Unlock(); return true; } return false; } bool CWorkItem::DispatchCompletedWorkItem( CJobMgr *pJobMgr ) { // Check if this work item needs to signal a job if ( pJobMgr && k_GIDNil != m_JobID ) { if ( !pJobMgr->BRouteWorkItemCompletedIfExists( m_JobID, m_bCanceled ) ) return false; } else if ( k_GIDNil != m_JobID ) { // This should never happen since we have already released our reference to the work item // and the calling job should have released its ref when it exited AssertMsg( false, "CWorkItem::DispatchCompletedWorkItem: got a work item with no job ID" ); } return true; } //----------------------------------------------------------------------------- // Purpose: Called by the worker thread when it finishes an individual work item // This function will see if our work is meant to be well-ordred; if so, // it will do the necessary work to ensure ordering. // // It adds the item to the completed work item list so // the pool owner can retrieve it and checks to see if any threads // deserve to be shut down. //----------------------------------------------------------------------------- void CWorkThreadPool::OnWorkItemCompleted( CWorkItem *pWorkItem ) { if ( sm_pWorkItemsCompletedSignal != NULL ) sm_pWorkItemsCompletedSignal->Signal(); if ( !m_bEnsureOutputOrdering ) { // Since we aren't locking this sequence number could get screwed up a bit, but it's // pretty meaningless if ensure output ordering if off anyway... m_ulLastCompletedSequenceNumber = pWorkItem->m_ulSequenceNumber; m_pTSQueueCompleted->PushItem( pWorkItem ); } else { // In the ordered case we need to lock completely here since we'll be moving around between // various data structures and also need to ensure the ordering of items in the TS queue m_MutexOnItemCompletedOrdered.Lock(); if ( m_ulLastCompletedSequenceNumber + 1 == pWorkItem->m_ulSequenceNumber ) { m_ulLastCompletedSequenceNumber = pWorkItem->m_ulSequenceNumber; m_pTSQueueCompleted->PushItem( pWorkItem ); // We walk the vector multiple times, but it should be very short as items are likely to come in // close to in order, just mixed up a little if we have lots of threads or one item is much more // costly than others. bool bFoundNext = false; do { bFoundNext = false; FOR_EACH_VEC( m_vecCompletedAndWaiting, i ) { CWorkItem *pWaiting = m_vecCompletedAndWaiting[i]; if ( m_ulLastCompletedSequenceNumber + 1 == pWaiting->m_ulSequenceNumber ) { m_ulLastCompletedSequenceNumber = pWaiting->m_ulSequenceNumber; m_pTSQueueCompleted->PushItem( pWaiting ); m_vecCompletedAndWaiting.FastRemove( i ); bFoundNext = true; break; } } } while ( bFoundNext == true ); } else { m_vecCompletedAndWaiting.AddToTail( pWorkItem ); } m_MutexOnItemCompletedOrdered.Unlock(); } } //----------------------------------------------------------------------------- // Purpose: return the count of items we've queued to process //----------------------------------------------------------------------------- int CWorkThreadPool::GetWorkItemToProcessCount() const { return m_pTSQueueToProcess->Count(); } //----------------------------------------------------------------------------- // Purpose: return the count of items we've completed but not notified the consumer about //----------------------------------------------------------------------------- int CWorkThreadPool::GetCompletedWorkItemCount() const { int nCount = m_pTSQueueCompleted->Count(); return nCount; } #ifdef DBGFLAG_VALIDATE //----------------------------------------------------------------------------- // Purpose: Validates memory //----------------------------------------------------------------------------- void CWorkThreadPool::Validate( CValidator &validator, const char *pchName ) { VALIDATE_SCOPE(); AUTO_LOCK( m_WorkThreadMutex ); ValidateObj( m_WorkThreads ); FOR_EACH_VEC( m_WorkThreads, iWorkThread ) { m_WorkThreads[ iWorkThread ]->Suspend(); ValidatePtr( m_WorkThreads[ iWorkThread ] ); } ValidateAlignedPtr( m_pTSQueueCompleted ); ValidateAlignedPtr( m_pTSQueueToProcess ); ValidateObj( m_vecCompletedAndWaiting ); FOR_EACH_VEC( m_vecCompletedAndWaiting, j ) { ValidatePtr( m_vecCompletedAndWaiting.Element( j ) ); } FOR_EACH_VEC( m_WorkThreads, iWorkThread ) { m_WorkThreads[ iWorkThread ]->Resume(); } #if 0 ValidateObj( m_StatExecutionTime ); ValidateObj( m_StatWaitTime ); #endif } #endif // DBGFLAG_VALIDATE } // namespace GCSDK