//========= Copyright Valve Corporation, All rights reserved. ============// // // Purpose: // //============================================================================= #include "vmpi.h" #include "vmpi_distribute_work.h" #include "tier0/platform.h" #include "tier0/dbg.h" #include "utlvector.h" #include "utllinkedlist.h" #include "vmpi_dispatch.h" #include "pacifier.h" #include "vstdlib/random.h" #include "mathlib/mathlib.h" #include "threadhelpers.h" #include "threads.h" #include "tier1/strtools.h" #include "tier1/utlmap.h" #include "tier1/smartptr.h" #include "tier0/icommandline.h" #include "cmdlib.h" #include "vmpi_distribute_tracker.h" #include "vmpi_distribute_work_internal.h" #define DW_SUBPACKETID_WU_ASSIGNMENT (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0) static int s_numWusToDeal = -1; void VMPI_SetWorkUnitsPartitionSize( int numWusToDeal ) { s_numWusToDeal = numWusToDeal; } class CWorkUnitInfo { public: WUIndexType m_iWorkUnit; }; class CWULookupInfo { public: CWULookupInfo() : m_iWUInfo( -1 ), m_iPartition( -222222 ), m_iPartitionListIndex( -1 ) {} public: int m_iWUInfo; // Index into m_WUInfo. int m_iPartition; // Which partition it's in. int m_iPartitionListIndex; // Index into its partition's m_WUs. }; class CPartitionInfo { public: typedef CUtlLinkedList< WUIndexType, int > PartitionWUs; public: int m_iPartition; // Index into m_Partitions. int m_iWorker; // Who owns this partition? PartitionWUs m_WUs; // Which WUs are in this partition? }; // Work units tracker to track consecutive finished blocks class CWorkUnitsTracker { public: CWorkUnitsTracker() {} public: // Initializes the unit tracker to receive numUnits in future void PrepareForWorkUnits( uint64 numUnits ); // Signals that a work unit has been finished // returns a zero-based index of the next pending work unit // up to which the task list has been processed fully now // because the received work unit filled the gap or was the next pending work unit. // returns 0 to indicate that this work unit is a "faster processed future work unit". uint64 WorkUnitFinished( uint64 iWorkUnit ); public: enum WUInfo { kNone, kTrigger, kDone }; CVisibleWindowVector< uint8 > m_arrInfo; }; void CWorkUnitsTracker::PrepareForWorkUnits( uint64 numUnits ) { m_arrInfo.Reset( numUnits + 1 ); if ( numUnits ) { m_arrInfo.ExpandWindow( 2ull, kNone ); m_arrInfo.Get( 0ull ) = kTrigger; } } uint64 CWorkUnitsTracker::WorkUnitFinished( uint64 iWorkUnit ) { uint64 uiResult = uint64( 0 ); if ( iWorkUnit >= m_arrInfo.FirstPossibleIndex() && iWorkUnit < m_arrInfo.PastPossibleIndex() ) { // Need to access the element m_arrInfo.ExpandWindow( iWorkUnit + 1, kNone ); // Set it done uint8 &rchThere = m_arrInfo.Get( iWorkUnit ), chThere = rchThere; rchThere = kDone; // Should we trigger? if ( kTrigger == chThere ) { // Go along all "done" work units and trigger the last found one while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) && ( kDone == m_arrInfo.Get( iWorkUnit ) ) ) continue; m_arrInfo.Get( iWorkUnit ) = kTrigger; m_arrInfo.ShrinkWindow( iWorkUnit - 1 ); uiResult = iWorkUnit; } else if( iWorkUnit == m_arrInfo.FirstPossibleIndex() ) { // Go along all "done" work units and shrink including the last found one while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) && ( kDone == m_arrInfo.Get( iWorkUnit ) ) ) continue; m_arrInfo.ShrinkWindow( iWorkUnit - 1 ); } } return uiResult; } CWorkUnitsTracker g_MasterWorkUnitsTracker; static bool CompareSoonestWorkUnitSets( CPartitionInfo::PartitionWUs * const &x, CPartitionInfo::PartitionWUs * const &y ) { // Compare by fourth/second/first job in the partitions WUIndexType missing = ~WUIndexType(0); WUIndexType jobsX[4] = { missing, missing, missing, missing }; WUIndexType jobsY[4] = { missing, missing, missing, missing }; int counter = 0; counter = 0; FOR_EACH_LL( (*x), i ) { jobsX[ counter ++ ] = (*x)[i]; if ( counter >= 4 ) break; } counter = 0; FOR_EACH_LL( (*y), i ) { jobsY[ counter ++ ] = (*y)[i]; if ( counter >= 4 ) break; } // Compare if ( jobsX[3] != jobsY[3] ) return ( jobsX[3] < jobsY[3] ); if ( jobsX[1] != jobsY[1] ) return ( jobsX[1] < jobsY[1] ); return jobsX[0] < jobsY[0]; } class CDistributor_DefaultMaster : public IWorkUnitDistributorMaster { public: virtual void Release() { delete this; } virtual void DistributeWork_Master( CDSInfo *pInfo ) { m_pInfo = pInfo; g_MasterWorkUnitsTracker.PrepareForWorkUnits( m_pInfo->m_nWorkUnits ); m_WULookup.Reset( pInfo->m_nWorkUnits ); while ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() ) { VMPI_DispatchNextMessage( 200 ); VMPITracker_HandleDebugKeypresses(); if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() ) break; } } virtual void OnWorkerReady( int iSource ) { AssignWUsToWorker( iSource ); } virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) { CWULookupInfo *pLookup = NULL; if ( iWorkUnit >= m_WULookup.FirstPossibleIndex() && iWorkUnit < m_WULookup.PastVisibleIndex() ) pLookup = &m_WULookup.Get( iWorkUnit ); if ( !pLookup || pLookup->m_iWUInfo == -1 ) return false; // Mark this WU finished and remove it from the list of pending WUs. m_WUInfo.Remove( pLookup->m_iWUInfo ); pLookup->m_iWUInfo = -1; // Get rid of the WU from its partition. int iPartition = pLookup->m_iPartition; CPartitionInfo *pPartition = m_Partitions[iPartition]; pPartition->m_WUs.Remove( pLookup->m_iPartitionListIndex ); // Shrink the window of the lookup work units if ( iWorkUnit == m_WULookup.FirstPossibleIndex() ) { WUIndexType kwu = iWorkUnit; for ( WUIndexType kwuEnd = m_WULookup.PastVisibleIndex(); kwu < kwuEnd; ++ kwu ) { if ( -1 != m_WULookup.Get( kwu ).m_iWUInfo && kwu > iWorkUnit ) break; } m_WULookup.ShrinkWindow( kwu - 1 ); } // Give the worker some new work if need be. if ( pPartition->m_WUs.Count() == 0 ) { int iPartitionWorker = pPartition->m_iWorker; delete pPartition; m_Partitions.Remove( iPartition ); // If there are any more WUs remaining, give the worker from this partition some more of them. if ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() ) { AssignWUsToWorker( iPartitionWorker ); } } uint64 iDoneWorkUnits = g_MasterWorkUnitsTracker.WorkUnitFinished( iWorkUnit ); if ( iDoneWorkUnits && g_pDistributeWorkCallbacks ) { g_pDistributeWorkCallbacks->OnWorkUnitsCompleted( iDoneWorkUnits ); } return true; } virtual void DisconnectHandler( int workerID ) { int iPartitionLookup = FindPartitionByWorker( workerID ); if ( iPartitionLookup != -1 ) { // Mark this guy's partition as unowned so another worker can get it. CPartitionInfo *pPartition = m_Partitions[iPartitionLookup]; pPartition->m_iWorker = -1; } } CPartitionInfo* AddPartition( int iWorker ) { CPartitionInfo *pNew = new CPartitionInfo; pNew->m_iPartition = m_Partitions.AddToTail( pNew ); pNew->m_iWorker = iWorker; return pNew; } bool SplitWUsPartition( CPartitionInfo *pPartitionLarge, CPartitionInfo **ppFirstHalf, CPartitionInfo **ppSecondHalf, int iFirstHalfWorker, int iSecondHalfWorker ) { int nCount = pPartitionLarge->m_WUs.Count(); if ( nCount > 1 ) // Allocate the partitions for the two workers { *ppFirstHalf = AddPartition( iFirstHalfWorker ); *ppSecondHalf = AddPartition( iSecondHalfWorker ); } else // Specially transfer a partition with too few work units { *ppFirstHalf = NULL; *ppSecondHalf = AddPartition( iSecondHalfWorker ); } // Prepare for transfer CPartitionInfo *arrNewParts[2] = { *ppFirstHalf ? *ppFirstHalf : *ppSecondHalf, *ppSecondHalf }; // Transfer the work units: // alternate first/second halves // don't put more than "half deal units" tasks into the second half // e.g. { 1, 2, 3, 4 } // becomes: 1st half { 1, 2 }, 2nd half { 3, 4 } for ( int k = 0; k < nCount; ++ k ) { int iHead = pPartitionLarge->m_WUs.Head(); WUIndexType iWU = pPartitionLarge->m_WUs[ iHead ]; pPartitionLarge->m_WUs.Remove( iHead ); /* int nHalf = !!( ( k % 2 ) || ( k >= nCount - 1 ) ); if ( k == 5 ) // no more than 2 jobs to branch off arrNewParts[ 1 ] = arrNewParts[ 0 ]; */ int nHalf = !( k < nCount/2 ); CPartitionInfo *pTo = arrNewParts[ nHalf ]; CWULookupInfo &li = m_WULookup.Get( iWU ); li.m_iPartition = pTo->m_iPartition; li.m_iPartitionListIndex = pTo->m_WUs.AddToTail( iWU ); } // LogPartitionsWorkUnits( pInfo ); return true; } void AssignWUsToWorker( int iWorker ) { // Get rid of this worker's old partition. int iPrevious = FindPartitionByWorker( iWorker ); if ( iPrevious != -1 ) { delete m_Partitions[iPrevious]; m_Partitions.Remove( iPrevious ); } if ( g_iVMPIVerboseLevel >= 1 ) Msg( "A" ); CVisibleWindowVector< CWULookupInfo > &vlkup = m_WULookup; if ( CommandLine()->FindParm( "-mpi_NoScheduler" ) ) { Warning( "\n\n-mpi_NoScheduler found: Warning - this should only be used for testing and with 1 worker!\n\n" ); vlkup.ExpandWindow( m_pInfo->m_nWorkUnits ); CPartitionInfo *pPartition = AddPartition( iWorker ); for ( int i=0; i < m_pInfo->m_nWorkUnits; i++ ) { CWorkUnitInfo info; info.m_iWorkUnit = i; CWULookupInfo &li = vlkup.Get( i ); li.m_iPartition = pPartition->m_iPartition; li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i ); li.m_iWUInfo = m_WUInfo.AddToTail( info ); } SendPartitionToWorker( pPartition, iWorker ); return; } // Any partitions abandoned by workers? int iAbandonedPartition = FindPartitionByWorker( -1 ); if ( -1 != iAbandonedPartition ) { CPartitionInfo *pPartition = m_Partitions[ iAbandonedPartition ]; pPartition->m_iWorker = iWorker; SendPartitionToWorker( pPartition, iWorker ); } // Any absolutely untouched partitions yet? else if ( vlkup.PastVisibleIndex() < vlkup.PastPossibleIndex() ) { // Figure out how many WUs to include in a batch int numWusToDeal = s_numWusToDeal; if ( numWusToDeal <= 0 ) { uint64 uiFraction = vlkup.PastPossibleIndex() / g_nMaxWorkerCount; Assert( uiFraction < INT_MAX/2 ); numWusToDeal = int( uiFraction ); if ( numWusToDeal <= 0 ) numWusToDeal = 8; } // Allocate room for upcoming work units lookup WUIndexType iBegin = vlkup.PastVisibleIndex(); WUIndexType iEnd = min( iBegin + g_nMaxWorkerCount * numWusToDeal, vlkup.PastPossibleIndex() ); vlkup.ExpandWindow( iEnd - 1 ); // Allocate a partition size_t numPartitions = min( ( size_t )(iEnd - iBegin), ( size_t )g_nMaxWorkerCount ); CArrayAutoPtr< CPartitionInfo * > spArrPartitions( new CPartitionInfo* [ numPartitions ] ); CPartitionInfo **arrPartitions = spArrPartitions.Get(); arrPartitions[0] = AddPartition( iWorker ); for ( size_t k = 1; k < numPartitions; ++ k ) arrPartitions[k] = AddPartition( -1 ); // Assign upcoming work units to the partitions. for ( WUIndexType i = iBegin ; i < iEnd; ++ i ) { CWorkUnitInfo info; info.m_iWorkUnit = i; CPartitionInfo *pPartition = arrPartitions[ size_t( (i - iBegin) % numPartitions ) ]; CWULookupInfo &li = vlkup.Get( i ); li.m_iPartition = pPartition->m_iPartition; li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i ); li.m_iWUInfo = m_WUInfo.AddToTail( info ); } // Now send this guy the WU list in his partition. SendPartitionToWorker( arrPartitions[0], iWorker ); } // Split one of the last partitions to finish sooner else { // Find a partition to split. int iPartToSplit = FindSoonestPartition(); if ( iPartToSplit >= 0 ) { CPartitionInfo *pPartition = m_Partitions[ iPartToSplit ]; CPartitionInfo *pOldHalf = NULL, *pNewHalf = NULL; int iOldWorker = pPartition->m_iWorker, iNewWorker = iWorker; if ( SplitWUsPartition( pPartition, &pOldHalf, &pNewHalf, iOldWorker, iNewWorker ) ) { if ( pOldHalf ) SendPartitionToWorker( pOldHalf, iOldWorker ); if ( pNewHalf ) SendPartitionToWorker( pNewHalf, iNewWorker ); // Delete the partition that got split Assert( pPartition->m_WUs.Count() == 0 ); delete pPartition; m_Partitions.Remove( iPartToSplit ); } } } } int FindSoonestPartition() { CUtlLinkedList < CPartitionInfo *, int > &lst = m_Partitions; // Sorted partitions CUtlMap< CPartitionInfo::PartitionWUs *, int > sortedPartitions ( CompareSoonestWorkUnitSets ); sortedPartitions.EnsureCapacity( lst.Count() ); FOR_EACH_LL( lst, i ) { sortedPartitions.Insert( &lst[i]->m_WUs, i ); } if ( sortedPartitions.Count() ) { return sortedPartitions.Element( sortedPartitions.FirstInorder() ); } return lst.Head(); } int FindPartitionByWorker( int iWorker ) { FOR_EACH_LL( m_Partitions, i ) { if ( m_Partitions[i]->m_iWorker == iWorker ) return i; } return -1; } void SendPartitionToWorker( CPartitionInfo *pPartition, int iWorker ) { // Stuff the next nWUs work units into the buffer. MessageBuffer mb; PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WU_ASSIGNMENT ); FOR_EACH_LL( pPartition->m_WUs, i ) { WUIndexType iWU = pPartition->m_WUs[i]; mb.write( &iWU, sizeof( iWU ) ); VMPITracker_WorkUnitSentToWorker( ( int ) iWU, iWorker ); } VMPI_SendData( mb.data, mb.getLen(), iWorker ); } virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) { return false; } private: CDSInfo *m_pInfo; CUtlLinkedList m_Partitions; CVisibleWindowVector m_WULookup; // Map work unit index to CWorkUnitInfo. CUtlLinkedList m_WUInfo; // Sorted with most elegible WU at the head. }; class CDistributor_DefaultWorker : public IWorkUnitDistributorWorker { public: virtual void Release() { delete this; } virtual void Init( CDSInfo *pInfo ) { } virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) { CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); // NOTE: this is called from INSIDE worker threads. if ( m_WorkUnits.Count() == 0 ) { return false; } else { *pWUIndex = m_WorkUnits[ m_WorkUnits.Head() ]; m_WorkUnits.Remove( m_WorkUnits.Head() ); return true; } } virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) { } virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) { if ( pBuf->data[1] == DW_SUBPACKETID_WU_ASSIGNMENT ) { // If the message wasn't even related to the current DistributeWork() call we're on, ignore it. if ( bIgnoreContents ) return true; if ( ((pBuf->getLen() - pBuf->getOffset()) % sizeof( WUIndexType )) != 0 ) { Error( "DistributeWork: invalid work units packet from master" ); } // Parse out the work unit indices. CCriticalSectionLock csLock( &m_CS ); csLock.Lock(); m_WorkUnits.Purge(); int nIndices = (pBuf->getLen() - pBuf->getOffset()) / sizeof( WUIndexType ); for ( int i=0; i < nIndices; i++ ) { WUIndexType iWU; pBuf->read( &iWU, sizeof( iWU ) ); // Add the index to the list. m_WorkUnits.AddToTail( iWU ); } csLock.Unlock(); return true; } else { return false; } } // Threads eat up the list of WUs in here. CCriticalSection m_CS; CUtlLinkedList m_WorkUnits; // A list of work units assigned to this worker }; IWorkUnitDistributorMaster* CreateWUDistributor_DefaultMaster() { return new CDistributor_DefaultMaster; } IWorkUnitDistributorWorker* CreateWUDistributor_DefaultWorker() { return new CDistributor_DefaultWorker; }