//====== Copyright (c), Valve Corporation, All rights reserved. ======= // // Purpose: Implements parallel job farming process // //============================================================================= #include "stdafx.h" #include "rtime.h" #include "gcparalleljobfarm.h" namespace GCSDK { bool IYieldingParallelFarmJobHandler::BYieldingExecuteParallel( int numJobsParallel, char const *pchJobName, uint nTimeoutSec ) { AssertRunningJob(); if ( !pchJobName ) pchJobName = GJobCur().GetName(); struct CParallelFarmHeapData_t { explicit CParallelFarmHeapData_t( IYieldingParallelFarmJobHandler *pHandler, int numJobsFarmLimit ) { m_pHandler = pHandler; m_jobIdParent = GJobCur().GetJobID(); m_numJobsFarmed = 0; m_numJobsFarmLimit = MAX( 1, numJobsFarmLimit ); m_iJobSequenceCounter = 0; m_bErrorEncountered = false; m_bWorkloadCompleted = false; } IYieldingParallelFarmJobHandler *m_pHandler; JobID_t m_jobIdParent; int m_numJobsFarmLimit; int m_numJobsFarmed; int m_iJobSequenceCounter; bool m_bErrorEncountered; bool m_bWorkloadCompleted; }; CParallelFarmHeapData_t *pHeapData = new CParallelFarmHeapData_t( this, numJobsParallel ); class CYieldingParallelFarmJob : public CGCJob { public: CYieldingParallelFarmJob( CGCBase *pGC, CParallelFarmHeapData_t *pJobData, char const *pchJobName, uint nTimeoutSec ) : CGCJob( pGC, pchJobName ) , m_pJobData( pJobData ), m_iJobSequenceCounter( pJobData->m_iJobSequenceCounter ), m_nTimeoutSec( nTimeoutSec ) { } virtual bool BYieldingRunJob( void *pvStartParam ) { if ( m_nTimeoutSec ) SetJobTimeout( m_nTimeoutSec ); bool bWorkloadCompleted = false; bool bResult = m_pJobData->m_pHandler ? m_pJobData->m_pHandler->BYieldingRunWorkload( m_iJobSequenceCounter, &bWorkloadCompleted ) : false; if ( !bResult ) m_pJobData->m_bErrorEncountered = true; else if ( bWorkloadCompleted ) m_pJobData->m_bWorkloadCompleted = true; -- m_pJobData->m_numJobsFarmed; if ( !m_pJobData->m_bErrorEncountered && !m_pJobData->m_bWorkloadCompleted ) { CYieldingParallelFarmJob *pFarmedJob = new CYieldingParallelFarmJob( m_pGC, m_pJobData, GetName(), m_nTimeoutSec ); ++ m_pJobData->m_numJobsFarmed; ++ m_pJobData->m_iJobSequenceCounter; pFarmedJob->StartJobDelayed( NULL ); } if ( !m_pJobData->m_numJobsFarmed ) { // No more farmed jobs to wait for m_pGC->GetJobMgr().BRouteWorkItemCompletedDelayed( m_pJobData->m_jobIdParent, false ); } return bResult; } protected: CParallelFarmHeapData_t *m_pJobData; int m_iJobSequenceCounter; uint m_nTimeoutSec; }; for ( ; ; ++ pHeapData->m_iJobSequenceCounter ) { if ( pHeapData->m_numJobsFarmed < pHeapData->m_numJobsFarmLimit ) { CYieldingParallelFarmJob *pFarmedJob = new CYieldingParallelFarmJob( GGCBase(), pHeapData, pchJobName, nTimeoutSec ); ++ pHeapData->m_numJobsFarmed; pFarmedJob->StartJobDelayed( NULL ); } else { if ( !GJobCur().BYieldingWaitForWorkItem( pchJobName ) ) { EmitError( SPEW_GC, "YieldingExecuteParallel: failed to sync with %u farmed work items.\n", pHeapData->m_numJobsFarmed ); pHeapData->m_bErrorEncountered = true; pHeapData->m_pHandler = NULL; // handler itself may become invalid when the function returns return false; // leak pHeapData because work items might still be running and this can avoid a crash (this condition is abnormal) } break; } } bool bResult = pHeapData->m_bWorkloadCompleted && !pHeapData->m_bErrorEncountered; delete pHeapData; return bResult; } }