//========= Copyright Valve Corporation, All rights reserved. ============// // // Purpose: // //============================================================================= #include "vmpi.h" #include "vmpi_distribute_work.h" #include "tier0/platform.h" #include "tier0/dbg.h" #include "utlvector.h" #include "utllinkedlist.h" #include "vmpi_dispatch.h" #include "pacifier.h" #include "vstdlib/random.h" #include "mathlib/mathlib.h" #include "threadhelpers.h" #include "threads.h" #include "tier1/strtools.h" #include "tier1/utlmap.h" #include "tier1/smartptr.h" #include "tier0/icommandline.h" #include "cmdlib.h" #include "vmpi_distribute_tracker.h" #include "vmpi_distribute_work_internal.h" #define DW_SUBPACKETID_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0) #define DW_SUBPACKETID_REQUEST_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+1) #define DW_SUBPACKETID_WUS_COMPLETED_LIST (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+2) // This is a pretty simple iterator. Basically, it holds a matrix of numbers. // Each row is assigned to a worker, and the worker just walks through his row. // // When a worker reaches the end of his row, it gets a little trickier. // They'll start doing their neighbor's row // starting at the back and continue on. At about this time, the master should reshuffle the // remaining work units to evenly distribute them amongst the workers. class CWorkUnitWalker { public: CWorkUnitWalker() { m_nWorkUnits = 0; } // This is all that's needed for it to start assigning work units. void Init( WUIndexType matrixWidth, WUIndexType matrixHeight, WUIndexType nWorkUnits ) { m_nWorkUnits = nWorkUnits; m_MatrixWidth = matrixWidth; m_MatrixHeight = matrixHeight; Assert( m_MatrixWidth * m_MatrixHeight >= nWorkUnits ); m_WorkerInfos.RemoveAll(); m_WorkerInfos.EnsureCount( m_MatrixHeight ); for ( int i=0; i < m_MatrixHeight; i++ ) { m_WorkerInfos[i].m_iStartWorkUnit = matrixWidth * i; m_WorkerInfos[i].m_iWorkUnitOffset = 0; } } // This is the main function of the shuffler bool GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex, bool *bWorkerFinishedHisColumn ) { if ( iWorker < 0 || iWorker >= m_WorkerInfos.Count() ) { Assert( false ); return false; } // If this worker has walked through all the work units, then he's done. CWorkerInfo *pWorker = &m_WorkerInfos[iWorker]; if ( pWorker->m_iWorkUnitOffset >= m_nWorkUnits ) return false; // If we've gone past the end of our work unit list, then we start at the BACK of the other rows of work units // in the hopes that we won't collide with the guy working there. We also should tell the master to reshuffle. WUIndexType iWorkUnitOffset = pWorker->m_iWorkUnitOffset; if ( iWorkUnitOffset >= m_MatrixWidth ) { WUIndexType xOffset = iWorkUnitOffset % m_MatrixWidth; WUIndexType yOffset = iWorkUnitOffset / m_MatrixWidth; xOffset = m_MatrixWidth - xOffset - 1; iWorkUnitOffset = yOffset * m_MatrixWidth + xOffset; *bWorkerFinishedHisColumn = true; } else { *bWorkerFinishedHisColumn = false; } *pWUIndex = (pWorker->m_iStartWorkUnit + iWorkUnitOffset) % m_nWorkUnits; ++pWorker->m_iWorkUnitOffset; return true; } private: class CWorkerInfo { public: WUIndexType m_iStartWorkUnit; WUIndexType m_iWorkUnitOffset; // Which work unit in my list of work units am I working on? }; WUIndexType m_nWorkUnits; WUIndexType m_MatrixWidth; WUIndexType m_MatrixHeight; CUtlVector m_WorkerInfos; }; class IShuffleRequester { public: virtual void RequestShuffle() = 0; }; // This is updated every time the master decides to reshuffle. // In-between shuffles, you can call NoteWorkUnitCompleted when a work unit is completed // and it'll avoid returning that work unit from GetNextWorkUnit again, but it WON'T class CShuffledWorkUnitWalker { public: void Init( WUIndexType nWorkUnits, IShuffleRequester *pRequester ) { m_iLastShuffleRequest = 0; m_iCurShuffle = 1; m_flLastShuffleTime = Plat_FloatTime(); m_pShuffleRequester = pRequester; int nBytes = PAD_NUMBER( nWorkUnits, 8 ) / 8; m_CompletedWUBits.SetSize( nBytes ); m_LocalCompletedWUBits.SetSize( nBytes ); for ( WUIndexType i=0; i < m_CompletedWUBits.Count(); i++ ) m_LocalCompletedWUBits[i] = m_CompletedWUBits[i] = 0; // Setup our list of work units remaining. for ( WUIndexType iWU=0; iWU < nWorkUnits; iWU++ ) { // Note: we're making an assumption here that if we add entries to a CUtlLinkedList in ascending order, their indices // will be ascending 1-by-1 as well. If that assumption breaks, we can create an extra array here to map WU indices to the linked list indices. WUIndexType index = m_WorkUnitsRemaining.AddToTail( iWU ); if ( index != iWU ) { Error( "CShuffledWorkUnitWalker: assumption on CUtlLinkedList indexing failed.\n" ); } } } void Shuffle( int nWorkers ) { if ( nWorkers == 0 ) return; ++m_iCurShuffle; m_flLastShuffleTime = Plat_FloatTime(); CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); m_WorkUnitsMap.RemoveAll(); m_WorkUnitsMap.EnsureCount( m_WorkUnitsRemaining.Count() ); // Here's the shuffle. The CWorkUnitWalker is going to walk each worker through its own group from 0-W, // and our job is to interleave it so when worker 0 goes [0,1,2] and worker 1 goes [100,101,102], they're actually // doing [0,N,2N] and [1,N+1,2N+1] where N=# of workers. // The grid is RxW long, and R*W is >= nWorkUnits. // R = # units per worker = width of the matrix // W = # workers = height of the matrix WUIndexType matrixHeight = nWorkers; WUIndexType matrixWidth = m_WorkUnitsRemaining.Count() / matrixHeight; if ( (m_WorkUnitsRemaining.Count() % matrixHeight) != 0 ) ++matrixWidth; Assert( matrixWidth * matrixHeight >= m_WorkUnitsRemaining.Count() ); WUIndexType iWorkUnit = 0; FOR_EACH_LL( m_WorkUnitsRemaining, i ) { WUIndexType xCoord = iWorkUnit / matrixHeight; WUIndexType yCoord = iWorkUnit % matrixHeight; Assert( xCoord < matrixWidth ); Assert( yCoord < matrixHeight ); m_WorkUnitsMap[yCoord*matrixWidth+xCoord] = m_WorkUnitsRemaining[i]; ++iWorkUnit; } m_Walker.Init( matrixWidth, matrixHeight, m_WorkUnitsRemaining.Count() ); } // Threadsafe. bool Thread_IsWorkUnitCompleted( WUIndexType iWU ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7)); return (val != 0); } WUIndexType Thread_NumWorkUnitsRemaining() { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); return m_WorkUnitsRemaining.Count(); } bool Thread_GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); while ( 1 ) { WUIndexType iUnmappedWorkUnit; bool bWorkerFinishedHisColumn; if ( !m_Walker.GetNextWorkUnit( iWorker, &iUnmappedWorkUnit, &bWorkerFinishedHisColumn ) ) return false; // If we've done all the work units assigned to us in the last shuffle, then request a reshuffle. if ( bWorkerFinishedHisColumn ) HandleWorkerFinishedColumn(); // Check the pending list. *pWUIndex = m_WorkUnitsMap[iUnmappedWorkUnit]; byte bIsCompleted = m_CompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7)); byte bIsCompletedLocally = m_LocalCompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7)); if ( !bIsCompleted && !bIsCompletedLocally ) return true; } } void HandleWorkerFinishedColumn() { if ( m_iLastShuffleRequest != m_iCurShuffle ) { double flCurTime = Plat_FloatTime(); if ( flCurTime - m_flLastShuffleTime > 2.0f ) { m_pShuffleRequester->RequestShuffle(); m_iLastShuffleRequest = m_iCurShuffle; } } } void Thread_NoteWorkUnitCompleted( WUIndexType iWU ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7)); if ( val == 0 ) { m_WorkUnitsRemaining.Remove( iWU ); m_CompletedWUBits[iWU >> 3] |= (1 << (iWU & 7)); } } void Thread_NoteLocalWorkUnitCompleted( WUIndexType iWU ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); m_LocalCompletedWUBits[iWU >> 3] |= (1 << (iWU & 7)); } CRC32_t GetShuffleCRC() { #ifdef _DEBUG static bool bCalcShuffleCRC = true; #else static bool bCalcShuffleCRC = VMPI_IsParamUsed( mpi_CalcShuffleCRC ); #endif if ( bCalcShuffleCRC ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); CRC32_t ret; CRC32_Init( &ret ); FOR_EACH_LL( m_WorkUnitsRemaining, i ) { WUIndexType iWorkUnit = m_WorkUnitsRemaining[i]; CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) ); } for ( int i=0; i < m_WorkUnitsMap.Count(); i++ ) { WUIndexType iWorkUnit = m_WorkUnitsMap[i]; CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) ); } CRC32_Final( &ret ); return ret; } else { return false; } } private: // These are PENDING WU completions until we call Shuffle() again, at which point we actually reorder the list // based on the completed WUs. CUtlVector m_CompletedWUBits; // Bit vector of completed WUs. CUtlLinkedList m_WorkUnitsRemaining; CUtlVector m_WorkUnitsMap; // Maps the 0-N indices in the CWorkUnitWalker to the list of remaining work units. // Helps us avoid some duplicates that happen during shuffling if we've completed some WUs and sent them // to the master, but the master hasn't included them in the DW_SUBPACKETID_WUS_COMPLETED_LIST yet. CUtlVector m_LocalCompletedWUBits; // Bit vector of completed WUs. // Used to control how frequently we request a reshuffle. unsigned int m_iCurShuffle; unsigned int m_iLastShuffleRequest; // The index of the shuffle we last requested a reshuffle on (don't request a reshuffle on the same one). double m_flLastShuffleTime; IShuffleRequester *m_pShuffleRequester; CWorkUnitWalker m_Walker; CCriticalSection m_CS; }; class CDistributor_SDKMaster : public IWorkUnitDistributorMaster, public IShuffleRequester { public: virtual void Release() { delete this; } static void Master_WorkerThread_Static( int iThread, void *pUserData ) { ((CDistributor_SDKMaster*)pUserData)->Master_WorkerThread( iThread ); } void Master_WorkerThread( int iThread ) { while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 && !g_bVMPIEarlyExit ) { WUIndexType iWU; if ( !m_WorkUnitWalker.Thread_GetNextWorkUnit( 0, &iWU ) ) { // Wait until there are some WUs to do. VMPI_Sleep( 10 ); continue; } // Do this work unit. m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); // We do this before it's completed because otherwise if a Shuffle() occurs, // the other thread might happen to pickup this work unit and we don't want that. m_pInfo->m_WorkerInfo.m_pProcessFn( iThread, iWU, NULL ); NotifyLocalMasterCompletedWorkUnit( iWU ); } } virtual void DistributeWork_Master( CDSInfo *pInfo ) { m_pInfo = pInfo; m_bForceShuffle = false; m_bShuffleRequested = false; m_flLastShuffleRequestServiceTime = Plat_FloatTime(); // Spawn idle-priority worker threads right here. m_bUsingMasterLocalThreads = (pInfo->m_WorkerInfo.m_pProcessFn != 0); if ( VMPI_IsParamUsed( mpi_NoMasterWorkerThreads ) ) { Msg( "%s found. No worker threads will be created.\n", VMPI_GetParamString( mpi_NoMasterWorkerThreads ) ); m_bUsingMasterLocalThreads = false; } m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this ); Shuffle(); if ( m_bUsingMasterLocalThreads ) RunThreads_Start( Master_WorkerThread_Static, this, k_eRunThreadsPriority_Idle ); uint64 lastShuffleTime = Plat_MSTime(); while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 ) { VMPI_DispatchNextMessage( 200 ); CheckLocalMasterCompletedWorkUnits(); VMPITracker_HandleDebugKeypresses(); if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() ) break; // Reshuffle the work units optimally every certain interval. if ( m_bForceShuffle || CheckShuffleRequest() ) { Shuffle(); lastShuffleTime = Plat_MSTime(); m_bForceShuffle = false; } } RunThreads_End(); } virtual void RequestShuffle() { m_bShuffleRequested = true; } bool CheckShuffleRequest() { if ( m_bShuffleRequested ) { double flCurTime = Plat_FloatTime(); if ( flCurTime - m_flLastShuffleRequestServiceTime > 2.0f ) // Only handle shuffle requests every so often. { m_flLastShuffleRequestServiceTime = flCurTime; m_bShuffleRequested = false; return true; } } return false; } void Shuffle() { // Build a list of who's working. CUtlVector whosWorking; if ( m_bUsingMasterLocalThreads ) { whosWorking.AddToTail( VMPI_MASTER_ID ); Assert( VMPI_MASTER_ID == 0 ); } { CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); for ( int i=0; i < pWorkersReady->m_WorkersReady.Count(); i++ ) { int iWorker = pWorkersReady->m_WorkersReady[i]; if ( VMPI_IsProcConnected( iWorker ) ) whosWorking.AddToTail( iWorker ); } m_WorkersReadyCS.Unlock(); } // Before sending the shuffle command, tell any of these active workers about the pending WUs completed. CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); m_WUSCompletedMessageBuffer.setLen( 0 ); if ( BuildWUsCompletedMessage( pWUsCompleted->m_Pending, m_WUSCompletedMessageBuffer ) > 0 ) { for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ ) { VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), whosWorking[i] ); } } pWUsCompleted->m_Completed.AddMultipleToTail( pWUsCompleted->m_Pending.Count(), pWUsCompleted->m_Pending.Base() ); // Add the pending ones to the full list now. pWUsCompleted->m_Pending.RemoveAll(); m_WUsCompletedCS.Unlock(); // Shuffle ourselves. m_WorkUnitWalker.Shuffle( whosWorking.Count() ); // Send the shuffle command to the workers. MessageBuffer mb; PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_SHUFFLE ); unsigned short nWorkers = whosWorking.Count(); mb.write( &nWorkers, sizeof( nWorkers ) ); CRC32_t shuffleCRC = m_WorkUnitWalker.GetShuffleCRC(); mb.write( &shuffleCRC, sizeof( shuffleCRC ) ); // Now for each worker, assign him an index in the shuffle and send the shuffle command. int workerIDPos = mb.getLen(); unsigned short id = 0; mb.write( &id, sizeof( id ) ); for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ ) { id = (unsigned short)i; mb.update( workerIDPos, &id, sizeof( id ) ); VMPI_SendData( mb.data, mb.getLen(), whosWorking[i] ); } } int BuildWUsCompletedMessage( CUtlVector &wusCompleted, MessageBuffer &mb ) { PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WUS_COMPLETED_LIST ); m_pInfo->WriteWUIndex( wusCompleted.Count(), &mb ); for ( int i=0; i < wusCompleted.Count(); i++ ) { m_pInfo->WriteWUIndex( wusCompleted[i], &mb ); } return wusCompleted.Count(); } virtual void OnWorkerReady( int iSource ) { CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); if ( pWorkersReady->m_WorkersReady.Find( iSource ) == -1 ) { pWorkersReady->m_WorkersReady.AddToTail( iSource ); // Get this guy up to speed on which WUs are done. { CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); m_WUSCompletedMessageBuffer.setLen( 0 ); BuildWUsCompletedMessage( pWUsCompleted->m_Completed, m_WUSCompletedMessageBuffer ); m_WUsCompletedCS.Unlock(); } VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), iSource ); m_bForceShuffle = true; } m_WorkersReadyCS.Unlock(); } virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) { return Thread_HandleWorkUnitResults( iWorkUnit ); } bool Thread_HandleWorkUnitResults( WUIndexType iWorkUnit ) { if ( m_WorkUnitWalker.Thread_IsWorkUnitCompleted( iWorkUnit ) ) { return false; } else { m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWorkUnit ); // We need the lock on here because our own worker threads can call into here. CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); pWUsCompleted->m_Pending.AddToTail( iWorkUnit ); m_WUsCompletedCS.Unlock(); return true; } } virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) { if ( pBuf->data[1] == DW_SUBPACKETID_REQUEST_SHUFFLE ) { if ( bIgnoreContents ) return true; m_bShuffleRequested = true; } return false; } virtual void DisconnectHandler( int workerID ) { CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); if ( pWorkersReady->m_WorkersReady.Find( workerID ) != -1 ) m_bForceShuffle = true; m_WorkersReadyCS.Unlock(); } public: CDSInfo *m_pInfo; class CWorkersReady { public: CUtlVector m_WorkersReady; // The list of workers who have said they're ready to participate. }; CCriticalSectionData m_WorkersReadyCS; class CWUsCompleted { public: CUtlVector m_Completed; // WUs completed that we have sent to workers. CUtlVector m_Pending; // WUs completed that we haven't sent to workers. }; CCriticalSectionData m_WUsCompletedCS; MessageBuffer m_WUSCompletedMessageBuffer; // Used to send lists of completed WUs. int m_bUsingMasterLocalThreads; bool m_bForceShuffle; bool m_bShuffleRequested; double m_flLastShuffleRequestServiceTime; CShuffledWorkUnitWalker m_WorkUnitWalker; }; class CDistributor_SDKWorker : public IWorkUnitDistributorWorker, public IShuffleRequester { public: virtual void Init( CDSInfo *pInfo ) { m_iMyWorkUnitWalkerID = -1; m_pInfo = pInfo; m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this ); } virtual void Release() { delete this; } virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) { // If we don't have an ID yet, we haven't received a Shuffle() command, so we're waiting for that before working. // TODO: we could do some random WUs here while we're waiting, although that could suck if the WUs take forever to do // and they're duplicates. if ( m_iMyWorkUnitWalkerID == -1 ) return false; // Look in our current shuffled list of work units for the next one. return m_WorkUnitWalker.Thread_GetNextWorkUnit( m_iMyWorkUnitWalkerID, pWUIndex ); } virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) { m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); } virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) { // If it's a SHUFFLE message, then shuffle.. if ( pBuf->data[1] == DW_SUBPACKETID_SHUFFLE ) { if ( bIgnoreContents ) return true; unsigned short nWorkers, myID; CRC32_t shuffleCRC; pBuf->read( &nWorkers, sizeof( nWorkers ) ); pBuf->read( &shuffleCRC, sizeof( shuffleCRC ) ); pBuf->read( &myID, sizeof( myID ) ); m_iMyWorkUnitWalkerID = myID; m_WorkUnitWalker.Shuffle( nWorkers ); if ( m_WorkUnitWalker.GetShuffleCRC() != shuffleCRC ) { static int nWarnings = 1; if ( ++nWarnings <= 2 ) Warning( "\nShuffle CRC mismatch\n" ); } return true; } else if ( pBuf->data[1] == DW_SUBPACKETID_WUS_COMPLETED_LIST ) { if ( bIgnoreContents ) return true; WUIndexType nCompleted; m_pInfo->ReadWUIndex( &nCompleted, pBuf ); for ( WUIndexType i=0; i < nCompleted; i++ ) { WUIndexType iWU; m_pInfo->ReadWUIndex( &iWU, pBuf ); m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWU ); } return true; } return false; } virtual void RequestShuffle() { // Ok.. request a reshuffle. MessageBuffer mb; PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_REQUEST_SHUFFLE ); VMPI_SendData( mb.data, mb.getLen(), VMPI_MASTER_ID ); } private: CDSInfo *m_pInfo; CShuffledWorkUnitWalker m_WorkUnitWalker; int m_iMyWorkUnitWalkerID; }; IWorkUnitDistributorMaster* CreateWUDistributor_SDKMaster() { return new CDistributor_SDKMaster; } IWorkUnitDistributorWorker* CreateWUDistributor_SDKWorker() { return new CDistributor_SDKWorker; }