//========= Copyright Valve Corporation, All rights reserved. ============// // // Purpose: This module implements the subset of MPI that VRAD and VVIS use. // // $NoKeywords: $ //=============================================================================// #include #include #include #include #include #include #include "iphelpers.h" #include "utlvector.h" #include "utllinkedlist.h" #include "vmpi.h" #include "bitbuf.h" #include "tier1/strtools.h" #include "threadhelpers.h" #include "IThreadedTCPSocket.h" #include "vstdlib/random.h" #include "vmpi_distribute_work.h" #include "filesystem.h" #include "checksum_md5.h" #include "tslist.h" #include "tier0/icommandline.h" #define DEFAULT_MAX_WORKERS 32 // Unless they specify -mpi_MaxWorkers, it will stop accepting workers after it gets this many. int g_nMaxWorkerCount = DEFAULT_MAX_WORKERS; #define VMPI_INTERNAL_PACKET_ID 27 #define VMPI_INTERNAL_SUBPACKET_MACHINE_NAME 1 #define VMPI_INTERNAL_SUBPACKET_COMMAND_LINE 2 #define VMPI_INTERNAL_SUBPACKET_WAITING_FOR_COMMAND_LINE 3 #define VMPI_INTERNAL_SUBPACKET_GROUPED_PACKET 4 #define VMPI_INTERNAL_SUBPACKET_TIMING_WAIT_DONE 5 #define VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME 6 typedef CUtlVector PersistentPacket; CCriticalSection g_PersistentPacketsCS; CUtlLinkedList g_PersistentPackets; // Command-line parameters list. #define VMPI_PARAM( paramName, paramFlags, helpText ) {paramName, paramFlags, "-"#paramName, helpText}, class CVMPIParam { public: EVMPICmdLineParam m_eParam; int m_ParamFlags; const char *m_pName; const char *m_pHelpText; }; static CVMPIParam g_VMPIParams[] = { {k_eVMPICmdLineParam_FirstParam, 0, "k_eVMPICmdLineParam_FirstParam", "unused"}, {k_eVMPICmdLineParam_VMPIParam, 0, "mpi", "Enable VMPI."}, #include "vmpi_parameters.h" }; #undef VMPI_PARAM // ---------------------------------------------------------------------------------------- // // Globals. // ---------------------------------------------------------------------------------------- // class CVMPIConnection; // Used by -mpi_AutoRestart. CUtlVector g_OriginalCommandLineParameters; // This queues up all the incoming VMPI messages. CCriticalSection g_VMPIMessagesCS; CUtlLinkedList< CTCPPacket*, int > g_VMPIMessages; CEvent g_VMPIMessagesEvent; // This is set when there are messages in the queue. // These are used to notify the main thread when some socket had OnError() called on it. CUtlLinkedList g_ErrorSockets; CEvent g_ErrorSocketsEvent; CCriticalSection g_ErrorSocketsCS; bool g_bTimingWaitDone = false; bool g_bGroupPackets = false; #define MAX_VMPI_CONNECTIONS 512 CVMPIConnection *g_Connections[MAX_VMPI_CONNECTIONS]; int g_nConnections = 0; CCriticalSection g_ConnectionsCS; // If true, then it will set certain thread priorities low. bool g_bSetThreadPriorities = true; VMPIDispatchFn g_VMPIDispatch[MAX_VMPI_PACKET_IDS]; CTSList g_DispatchBuffers; VMPIRunMode g_VMPIRunMode = VMPI_RUN_NETWORKED; VMPIFileSystemMode g_VMPIFileSystemMode = VMPI_FILESYSTEM_TCP; static char g_GroupedPacketHeader[] = { VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_GROUPED_PACKET }; static unsigned long g_LastFlushGroupedPacketsTime = 0; // Set to true if we're running under the SDK (i.e. vmpi_transfer.exe is not found). bool g_bVMPISDKMode = false; bool g_bVMPISDKModeSet = false; // If g_bVMPISDKMode has not been set, then VMPI_IsSDKMode just looks for VMPI_Transfer (and doesn't check the command line). int g_nBytesSent = 0; int g_nMessagesSent = 0; int g_nBytesReceived = 0; int g_nMessagesReceived = 0; int g_nMulticastBytesSent = 0; int g_nMulticastBytesReceived = 0; CUtlLinkedList g_DisconnectHandlers; bool g_bUseMPI = false; int g_iVMPIVerboseLevel = 0; bool g_bMPIMaster = false; bool g_bMPI_Stats = false; bool g_bMPI_StatsTextOutput = false; char g_CurrentStageString[128] = ""; CCriticalSection g_CurrentStageCS; char g_MasterExeName[MAX_PATH]; bool g_bReceivedMasterExeName = false; // Change our window text. HINSTANCE g_hKernel32DLL = NULL; typedef HWND (*GetConsoleWndFn)(); GetConsoleWndFn g_pConsoleWndFn = NULL; // ---------------------------------------------------------------------------------------- // // Classes. // ---------------------------------------------------------------------------------------- // // This class is used while discovering what files the workers need. class CDependencyInfo { public: class CDependencyFile { public: char m_Name[MAX_PATH]; }; // This is the directory where the dependency files live (i.e. all the binaries that the workers need to run the job). char m_DependencyFilesDir[MAX_PATH]; // "vrad.exe", "vvis.exe", etc. char m_OriginalExeFilename[MAX_PATH]; CUtlVector m_Files; public: CDependencyFile* FindFile( const char *pFilename ) { for ( int i=0; i < m_Files.Count(); i++ ) { if ( stricmp( pFilename, m_Files[i]->m_Name ) == 0 ) return m_Files[i]; } return NULL; } }; class CVMPIConnection : public ITCPSocketHandler { public: CVMPIConnection( int iConnection ) { m_iConnection = iConnection; m_pSocket = NULL; m_bIsAService = false; char str[512]; Q_snprintf( str, sizeof( str ), "%d", iConnection ); SetMachineName( str ); m_JobWorkerID = 0xFFFFFFFF; m_bNameSet = false; } ~CVMPIConnection() { if ( m_pSocket ) m_pSocket->Release(); } public: void HandleDisconnect() { if ( m_pSocket ) { // Copy out the error string. CCriticalSectionLock csLock( &g_ErrorSocketsCS ); csLock.Lock(); char str[512]; Q_strncpy( str, m_ErrorString.Base(), sizeof( str ) ); csLock.Unlock(); // Tell the app. FOR_EACH_LL( g_DisconnectHandlers, i ) g_DisconnectHandlers[i]( m_iConnection, str ); // Free our socket. m_pSocket->Release(); m_pSocket = NULL; } } IThreadedTCPSocket* GetSocket() { return m_pSocket; } void SetMachineName( const char *pName ) { m_MachineName.CopyArray( pName, strlen( pName ) + 1 ); m_bNameSet = true; } const char* GetMachineName() { return m_MachineName.Base(); } bool HasMachineNameBeenSet() { return m_bNameSet; } // ITCPSocketHandler implementation (thread-safe stuff). public: virtual void Init( IThreadedTCPSocket *pSocket ) { m_pSocket = pSocket; } virtual void OnPacketReceived( CTCPPacket *pPacket ) { // Set who this message came from. pPacket->SetUserData( m_iConnection ); Assert( m_iConnection >= 0 && m_iConnection < 2048 ); // Store it in the global list. CCriticalSectionLock csLock( &g_VMPIMessagesCS ); csLock.Lock(); g_VMPIMessages.AddToTail( pPacket ); if ( g_VMPIMessages.Count() == 1 ) g_VMPIMessagesEvent.SetEvent(); } virtual void OnError( int errorCode, const char *pErrorString ) { if ( !g_bMPIMaster ) { Msg( "%s - CVMPIConnection::OnError( %s )\n", GetMachineName(), pErrorString ); } CCriticalSectionLock csLock( &g_ErrorSocketsCS ); csLock.Lock(); m_ErrorString.CopyArray( pErrorString, strlen( pErrorString ) + 1 ); g_ErrorSockets.AddToTail( this ); // Notify the main thread that a socket is in trouble! g_ErrorSocketsEvent.SetEvent(); // Make sure the main thread picks up this error soon. InterlockedIncrement( &m_ErrorSignal ); } public: unsigned long m_JobWorkerID; bool m_bIsAService; // If true, then this is just a service getting the files. Don't count it as an active worker. CUtlVector m_GroupedChunkLengths; CUtlVector m_GroupedChunks; private: CUtlVector m_MachineName; CUtlVector m_ErrorString; long m_ErrorSignal; int m_iConnection; IThreadedTCPSocket *m_pSocket; bool m_bNameSet; }; class CVMPIConnectionCreator : public IHandlerCreator { public: virtual ITCPSocketHandler* CreateNewHandler() { Assert( g_nConnections < MAX_VMPI_CONNECTIONS ); CVMPIConnection *pRet = new CVMPIConnection( g_nConnections ); g_Connections[g_nConnections++] = pRet; return pRet; } }; // ---------------------------------------------------------------------------------------- // // Helpers. // ---------------------------------------------------------------------------------------- // const char* VMPI_FindArg( int argc, char **argv, const char *pName, const char *pDefault ) { for ( int i=0; i < argc; i++ ) { if ( stricmp( argv[i], pName ) == 0 ) { if ( (i+1) < argc ) return argv[i+1]; else return pDefault; } } return NULL; } void ParseOptions( int argc, char **argv ) { if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_NoTimeout ) ) ) ThreadedTCP_EnableTimeouts( false ); if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_DontSetThreadPriorities ) ) ) { Msg( "%s found.\n", VMPI_GetParamString( mpi_DontSetThreadPriorities ) ); g_bSetThreadPriorities = false; ThreadedTCP_SetTCPSocketThreadPriorities( false ); } if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_GroupPackets ) ) ) { Msg( "%s found.\n", VMPI_GetParamString( mpi_GroupPackets ) ); g_bGroupPackets = true; } const char *pTransmitRate = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_FileTransmitRate ), "1" ); if ( pTransmitRate ) { extern int MULTICAST_TRANSMIT_RATE; MULTICAST_TRANSMIT_RATE = atoi( pTransmitRate ) * 1024; } const char *pVerbose = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Verbose ), "1" ); if ( pVerbose ) { if ( pVerbose[0] == '1' ) g_iVMPIVerboseLevel = 1; else if ( pVerbose[0] == '2' ) g_iVMPIVerboseLevel = 2; } if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Stats ) ) ) g_bMPI_Stats = true; if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Stats_TextOutput ) ) ) g_bMPI_StatsTextOutput = true; } void SetupDependencyFilename( CDependencyInfo *pInfo, const char *pPatchDirectory ) { char baseExeFilename[512]; if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) Error( "GetModuleFileName failed." ); // If they're in patch mode, then the dependency files come out of a directory they've passed in. // Otherwise, the files come from the same exe dir we're in (like c:\valve\game\bin). if ( pPatchDirectory ) { V_strncpy( pInfo->m_DependencyFilesDir, pPatchDirectory, sizeof( pInfo->m_DependencyFilesDir ) ); } else { V_strncpy( pInfo->m_DependencyFilesDir, baseExeFilename, sizeof( pInfo->m_DependencyFilesDir ) ); V_StripLastDir( pInfo->m_DependencyFilesDir, sizeof( pInfo->m_DependencyFilesDir ) ); } // Get the exe filename. V_strncpy( pInfo->m_OriginalExeFilename, V_UnqualifiedFileName( baseExeFilename ), sizeof( pInfo->m_OriginalExeFilename ) ); } bool ReadString( char *pOut, int maxLen, FILE *fp ) { if ( !fgets( pOut, maxLen, fp ) || pOut[0] == 0 ) return false; int len = strlen( pOut ); if ( pOut[len - 1] == '\n' ) pOut[len - 1] = 0; return true; } void ParseDependencyFile( CDependencyInfo *pInfo, const char *pDepFilename ) { FILE *fp = fopen( pDepFilename, "rt" ); if ( !fp ) Error( "Can't find %s.", pDepFilename ); const char *pOptionalPrefix = "optional "; char tempStr[MAX_PATH]; while ( ReadString( tempStr, sizeof( tempStr ), fp ) ) { CDependencyInfo::CDependencyFile *pFile = new CDependencyInfo::CDependencyFile; bool bOptional = false; if ( strstr( tempStr, "optional " ) == tempStr ) { bOptional = true; Q_strncpy( pFile->m_Name, tempStr + strlen( pOptionalPrefix ), sizeof( pFile->m_Name ) ); } else { Q_strncpy( pFile->m_Name, tempStr, sizeof( pFile->m_Name ) ); } // Now get the file info. char fullFilename[MAX_PATH]; V_ComposeFileName( pInfo->m_DependencyFilesDir, pFile->m_Name, fullFilename, sizeof( fullFilename ) ); if ( _access( fullFilename, 0 ) == 0 ) { pInfo->m_Files.AddToTail( pFile ); } else { delete pFile; if ( !bOptional ) Error( "Can't find %s (listed in %s).", fullFilename, pDepFilename ); } } fclose( fp ); } void SetupDependenciesForPatch( CDependencyInfo *pInfo, const char *pPatchDirectory ) { char searchStr[MAX_PATH]; V_ComposeFileName( pPatchDirectory, "*.*", searchStr, sizeof( searchStr ) ); _finddata_t data; long handle = _findfirst( searchStr, &data ); if ( handle != -1 ) { do { if ( data.name[0] == '.' || (data.attrib & _A_SUBDIR) != 0 ) continue; CDependencyInfo::CDependencyFile *pFile = new CDependencyInfo::CDependencyFile; V_strncpy( pFile->m_Name, data.name, sizeof( pFile->m_Name ) ); pInfo->m_Files.AddToTail( pFile ); } while( _findnext( handle, &data ) == 0 ); _findclose( handle ); } } void SetupDependencyInfo( CDependencyInfo *pInfo, const char *pDependencyFilename, bool bPatchMode ) { if ( bPatchMode ) { const char *pPatchDirectory = pDependencyFilename; SetupDependencyFilename( pInfo, pPatchDirectory ); SetupDependenciesForPatch( pInfo, pPatchDirectory ); } else { SetupDependencyFilename( pInfo, NULL ); // Parse the dependency file. char depFilename[MAX_PATH]; V_ComposeFileName( pInfo->m_DependencyFilesDir, pDependencyFilename, depFilename, sizeof( depFilename ) ); ParseDependencyFile( pInfo, depFilename ); } } int GetCurMicrosecondsAndSleep( int sleepLen ) { Sleep( sleepLen ); CCycleCount cnt; cnt.Sample(); return cnt.GetMicroseconds(); } void CountActiveConnections( int *nRegularWorkers, int *nServiceDownloaders ) { *nRegularWorkers = *nServiceDownloaders = 0; int nTotalConnections = g_nConnections; for ( int i=0; i < nTotalConnections; i++ ) { if ( VMPI_IsProcConnected( i ) ) { if ( VMPI_IsProcAService( i ) ) (*nServiceDownloaders)++; else (*nRegularWorkers)++; } } } // In this function, we update the window text to tell how many active workers there are. void UpdateActiveConnectionsText() { if ( !g_bMPIMaster || !g_pConsoleWndFn ) return; HWND hWnd = g_pConsoleWndFn(); if ( !hWnd ) return; int nRegularWorkers, nDownloaders; CountActiveConnections( &nRegularWorkers, &nDownloaders ); char str[512]; if ( g_bVMPISDKMode ) { V_snprintf( str, sizeof( str ), "VMPI (SDK) - Workers: %d", nRegularWorkers ); } else { V_snprintf( str, sizeof( str ), "VMPI - Workers: %d, Downloaders: %d", nRegularWorkers, nDownloaders ); } SetWindowText( hWnd, str ); } void VMPI_SendMachineNameTo( int iProc ) { const char *pMyName = VMPI_GetLocalMachineName(); unsigned char packetData[512]; packetData[0] = VMPI_INTERNAL_PACKET_ID; packetData[1] = VMPI_INTERNAL_SUBPACKET_MACHINE_NAME; Q_strncpy( (char*)&packetData[2], pMyName, sizeof( packetData ) - 2 ); VMPI_SendData( packetData, 2 + strlen( pMyName ) + 1, iProc ); } static CVMPIConnection* FindConnectionBySocket( IThreadedTCPSocket *pSocket, bool bLockConnections ) { CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); if ( bLockConnections ) connectionsLock.Lock(); for ( int i=0; i < g_nConnections; i++ ) if ( g_Connections[i]->GetSocket() == pSocket ) return g_Connections[i]; return NULL; } static char* CopyString( const char *pStr ) { int len = V_strlen( pStr ) + 1; char *pArg = new char[len]; Q_strncpy( pArg, pStr, len ); return pArg; } // ---------------------------------------------------------------------------------------- // // Internal VMPI dispatch.. // ---------------------------------------------------------------------------------------- // void VMPI_SetMachineName( int iProc, const char *pName ); CUtlVector g_WorkerCommandLine; bool g_bReceivedWorkerCommandLine = false; bool VMPI_InternalDispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID ) { if ( pBuf->getLen() >= 2 ) { if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_MACHINE_NAME ) { if ( pBuf->getLen() >= 3 ) { VMPI_SetMachineName( iSource, &pBuf->data[2] ); return true; } } else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_WAITING_FOR_COMMAND_LINE ) { if ( !VMPI_IsSDKMode() ) { Warning( "Worker %d is running in SDK mode (and the master is not)!\n", iSource ); } return true; } else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_COMMAND_LINE ) { pBuf->setOffset( 2 ); int nArgs; pBuf->read( &nArgs, sizeof( nArgs ) ); for ( int i=0; i < nArgs; i++ ) { char str[4096]; if ( pBuf->ReadString( str, sizeof( str ) ) == -1 ) Error( "Error in ReadString() while reading command line." ); g_WorkerCommandLine.AddToTail( CopyString( str ) ); } g_bReceivedWorkerCommandLine = true; return true; } else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME ) { pBuf->setOffset( 2 ); if ( pBuf->ReadString( g_MasterExeName, sizeof( g_MasterExeName ) ) == -1 ) Error( "Error in ReadString() while reading VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME." ); g_bReceivedMasterExeName = true; return true; } else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_TIMING_WAIT_DONE ) { g_bTimingWaitDone = true; return true; } } return false; } CDispatchReg g_VMPIInternalDispatchReg( VMPI_INTERNAL_PACKET_ID, VMPI_InternalDispatchFn ); // register to handle the messages we want void VMPI_SendCommandLine( int argc, char **argv ) { MessageBuffer mb; char cPacketHeader[2] = {VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_COMMAND_LINE}; mb.write( cPacketHeader, sizeof( cPacketHeader ) ); mb.write( &argc, sizeof( argc ) ); for ( int i=0; i < argc; i++ ) mb.WriteString( argv[i] ); VMPI_SendData( mb.data, mb.getLen(), VMPI_PERSISTENT ); } void VMPI_ReceiveCommandLine() { // For verification purposes, tell the master we're trying to get the command line. unsigned char chData[2] = {VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_WAITING_FOR_COMMAND_LINE}; VMPI_SendData( chData, sizeof( chData ), VMPI_MASTER_ID ); double startTime = Plat_FloatTime(); while ( !g_bReceivedWorkerCommandLine ) { if ( Plat_FloatTime() - startTime > 30 ) Error( "VMPI_ReceiveCommandLine: timeout. Is the master running in SDK mode?" ); VMPI_DispatchNextMessage( 10 * 1000 ); } } void VMPI_SendExeName() { MessageBuffer mb; char cPacketHeader[2] = {VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME}; mb.write( cPacketHeader, sizeof( cPacketHeader ) ); char baseExeFilename[MAX_PATH], fileBase[MAX_PATH]; if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) Error( "VMPI_CheckSDKMode -> GetModuleFileName failed." ); V_FileBase( baseExeFilename, fileBase, sizeof( fileBase ) ); mb.WriteString( fileBase ); VMPI_SendData( mb.data, mb.getLen(), VMPI_PERSISTENT ); } void VMPI_ReceiveExeName() { double startTime = Plat_FloatTime(); while ( !g_bReceivedMasterExeName ) { if ( Plat_FloatTime() - startTime > 30 ) Error( "VMPI_ReceiveExeName: timeout." ); VMPI_DispatchNextMessage( 10 * 1000 ); } // Now compare the exe name we got with our own. char baseExeFilename[MAX_PATH], fileBase[MAX_PATH]; if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) Error( "VMPI_CheckSDKMode -> GetModuleFileName failed." ); // Unless we're a vmpi_transfer.. vmpi_transfer can always connect. V_FileBase( baseExeFilename, fileBase, sizeof( fileBase ) ); if ( V_stricmp( fileBase, "vmpi_transfer" ) != 0 ) { if ( V_stricmp( fileBase, g_MasterExeName ) != 0 ) { Error( "VMPI_ReceiveExeName: mismatched exe names (master: %s, me: %s).\nThis usually just means the master finished" " a job like vvis really fast and started a vrad immediately, and an old vvis worker connected to the new vrad job.", g_MasterExeName, fileBase ); } } } // ---------------------------------------------------------------------------------------- // // CMasterBroadcaster // This class broadcasts messages looking for workers. The app updates it as often as possible // and it'll add workers as necessary. // ---------------------------------------------------------------------------------------- // #define MASTER_BROADCAST_INTERVAL 600 // Send every N milliseconds. class CMasterBroadcaster { public: CMasterBroadcaster(); ~CMasterBroadcaster(); bool Init( int argc, char **argv, const char *pDependencyFilename, int nMaxWorkers, VMPIRunMode runMode, bool bPatchMode ); void Term(); // What port is it listening on? int GetListenPort() const; // These can be used to allow more workers on or filter who's able to connect int GetMaxWorkers() const; void IncreaseMaxWorkers( int count ); void SetPassword( const char *pPassword ); void SetNoTimeoutOption(); private: void GetPatchWorkerList( int argc, char **argv ); private: class CMasterBroadcastInfo { public: int m_JobID[4]; char m_Password[256]; char m_WorkerExeFilename[MAX_PATH]; CUtlVector m_Args; char m_PatchVersion[32]; // 0 if not patching. bool m_bForcePatch; }; void ThreadFn(); static DWORD WINAPI StaticThreadFn( LPVOID lpParameter ); bool Update(); void BuildBroadcastPacket( bf_write &buf ); private: ITCPConnectSocket *m_pListenSocket; ITCPConnectSocket *m_pDownloaderListenSocket; ISocket *m_pSocket; DWORD m_LastSendTime; CMasterBroadcastInfo m_BroadcastInfo; CUtlVector m_PatchWorkerIPs; // If in patch mode, these are the IPs we send the job request to (instead of broadcasting). bool m_bPatching; CVMPIConnectionCreator m_ConnectionCreator; int m_nMaxWorkers; HANDLE m_hThread; CEvent m_hShutdownEvent; CEvent m_hShutdownReply; VMPIRunMode m_RunMode; int m_iListenPort; int m_iDownloaderListenPort; }; CMasterBroadcaster::CMasterBroadcaster() { m_pListenSocket = NULL; m_pDownloaderListenSocket = NULL; m_pSocket = NULL; m_iListenPort = -1; m_iDownloaderListenPort = -1; } CMasterBroadcaster::~CMasterBroadcaster() { Term(); } void CMasterBroadcaster::GetPatchWorkerList( int argc, char **argv ) { m_PatchWorkerIPs.Purge(); for ( int i=0; i < argc-1; i++ ) { if ( V_stricmp( argv[i], "-mpi_PatchWorkers" ) == 0 ) { int workerCount = atoi( argv[i+1] ); for ( int iWorker=0; iWorker < workerCount; iWorker++ ) { int iArg = i+2 + iWorker; if ( iArg >= argc ) Error( "-mpi_PatchWorkers: %d specified for count, but not enough IPs following.\n", workerCount ); int a, b, c, d; const char *pArg = argv[iArg]; sscanf( pArg, "%d.%d.%d.%d", &a, &b, &c, &d ); CIPAddr addr; addr.Init( a, b, c, d, 0 ); m_PatchWorkerIPs.AddToTail( addr ); } return; } } } bool CMasterBroadcaster::Init( int argc, char **argv, const char *pDependencyFilename, int nMaxWorkers, VMPIRunMode runMode, bool bPatchMode ) { m_RunMode = runMode; m_nMaxWorkers = nMaxWorkers; // Open the file that tells us which binaries we depend on. CDependencyInfo dependencyInfo; if ( m_RunMode == VMPI_RUN_NETWORKED && !g_bVMPISDKMode ) { SetupDependencyInfo( &dependencyInfo, pDependencyFilename, bPatchMode ); } m_pListenSocket = NULL; m_pDownloaderListenSocket = NULL; const char *pPortStr = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Port ) ); if ( pPortStr ) { m_iListenPort = atoi( pPortStr ); m_iDownloaderListenPort = m_iListenPort + 1; m_pListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iListenPort ); if ( !g_bVMPISDKMode ) { m_pDownloaderListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iDownloaderListenPort ); } } else { // Create a socket to listen on. CCycleCount cnt; cnt.Sample(); int iTime = (int)cnt.GetMicroseconds(); srand( (unsigned)iTime ); for ( int iTest=VMPI_MASTER_FIRST_PORT; iTest <= VMPI_MASTER_LAST_PORT; iTest++ ) { m_iListenPort = iTest; m_pListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iListenPort ); if ( m_pListenSocket ) break; } // No need to create the downloader in SDK mode. if ( m_pListenSocket && !g_bVMPISDKMode ) { for ( int iTest=m_iListenPort+1; iTest <= VMPI_MASTER_LAST_PORT; iTest++ ) { m_iDownloaderListenPort = iTest; if ( m_iDownloaderListenPort == m_iListenPort ) continue; m_pDownloaderListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iDownloaderListenPort ); if ( m_pDownloaderListenSocket ) break; } } } if ( !m_pListenSocket || (!g_bVMPISDKMode && !m_pDownloaderListenSocket) ) { Error( "Can't bind a listen socket in port range [%d, %d].", VMPI_MASTER_PORT_FIRST, VMPI_MASTER_PORT_LAST ); } // Create a socket to broadcast from unless we're in the SDK in which case we don't broadcast. m_bPatching = false; if ( m_RunMode == VMPI_RUN_NETWORKED && !g_bVMPISDKMode ) { m_pSocket = CreateIPSocket(); if ( !m_pSocket->BindToAny( 0 ) ) Error( "MPI_Init_Master: can't bind a socket" ); m_BroadcastInfo.m_bForcePatch = false; if ( bPatchMode ) { m_bPatching = true; if ( VMPI_FindArg( argc, argv, "-mpi_ForcePatch", NULL ) ) m_BroadcastInfo.m_bForcePatch = true; const char *pArg = VMPI_FindArg( argc, argv, "-mpi_PatchVersion", "0" ); float iPatchVersion = atof( pArg ); if ( iPatchVersion <= 0 || iPatchVersion >= ((1 << 15) - 1) ) { Error( "-mpi_PatchVersion - val must be between 1.0 and 32767.0" ); } V_strncpy( m_BroadcastInfo.m_PatchVersion, pArg, sizeof( m_BroadcastInfo.m_PatchVersion ) ); } else { m_BroadcastInfo.m_PatchVersion[0] = 0; } // Come up with a unique job ID. m_BroadcastInfo.m_JobID[0] = GetCurMicrosecondsAndSleep( 1 ); m_BroadcastInfo.m_JobID[1] = GetCurMicrosecondsAndSleep( 1 ); m_BroadcastInfo.m_JobID[2] = GetCurMicrosecondsAndSleep( 1 ); m_BroadcastInfo.m_JobID[3] = GetCurMicrosecondsAndSleep( 1 ); const char *pPassword = VMPI_FindArg( argc, argv, "-mpi_pw", "" ); Q_strncpy( m_BroadcastInfo.m_Password, pPassword ? pPassword : "", sizeof( m_BroadcastInfo.m_Password ) ); Q_strncpy( m_BroadcastInfo.m_WorkerExeFilename, dependencyInfo.m_OriginalExeFilename, sizeof( m_BroadcastInfo.m_WorkerExeFilename ) ); // Store the command-line args. m_BroadcastInfo.m_Args.Purge(); for ( int i=1; i < argc; i++ ) { m_BroadcastInfo.m_Args.AddToTail( CopyString( argv[i] ) ); } // 0th arg is the exe name. m_BroadcastInfo.m_Args.InsertBefore( 0, CopyString( m_BroadcastInfo.m_WorkerExeFilename ) ); // Now add arguments for each file they need to transmit. The service will use this to get all the files from the master before it starts the app. for ( int i=0; i < dependencyInfo.m_Files.Count(); i++ ) { m_BroadcastInfo.m_Args.InsertAfter( 0, "-mpi_file" ); m_BroadcastInfo.m_Args.InsertAfter( 1, CopyString( dependencyInfo.m_Files[i]->m_Name ) ); } // Add -mpi_filebase so it can use absolute paths with the filesystem so we get the exact right set of files. m_BroadcastInfo.m_Args.InsertAfter( 0, "-mpi_filebase" ); m_BroadcastInfo.m_Args.InsertAfter( 1, CopyString( dependencyInfo.m_DependencyFilesDir ) ); if ( bPatchMode ) { GetPatchWorkerList( argc, argv ); } } // Add ourselves as the first process (rank 0). m_ConnectionCreator.CreateNewHandler(); // Initiate as many connections as we can for a few seconds. m_LastSendTime = Plat_MSTime() - MASTER_BROADCAST_INTERVAL*2; m_hShutdownEvent.Init( false, false ); m_hShutdownReply.Init( false, false ); DWORD dwThreadID = 0; m_hThread = CreateThread( NULL, 0, &CMasterBroadcaster::StaticThreadFn, this, 0, &dwThreadID ); if ( m_hThread ) { SetThreadPriority( m_hThread, THREAD_PRIORITY_HIGHEST ); return true; } else { return false; } } void CMasterBroadcaster::BuildBroadcastPacket( bf_write &buf ) { // Broadcast out to tell all the machines we want workers. buf.WriteByte( VMPI_PROTOCOL_VERSION ); buf.WriteString( m_BroadcastInfo.m_Password ); if ( m_BroadcastInfo.m_PatchVersion[0] == 0 ) buf.WriteByte( VMPI_LOOKING_FOR_WORKERS ); else buf.WriteByte( VMPI_SERVICE_PATCH ); buf.WriteString( m_BroadcastInfo.m_PatchVersion ); buf.WriteLong( m_iListenPort ); // Tell the port that we're listening on. buf.WriteLong( m_BroadcastInfo.m_JobID[0] ); buf.WriteLong( m_BroadcastInfo.m_JobID[1] ); buf.WriteLong( m_BroadcastInfo.m_JobID[2] ); buf.WriteLong( m_BroadcastInfo.m_JobID[3] ); buf.WriteWord( m_BroadcastInfo.m_Args.Count() + 2 ); // Write the alternate exe name. buf.WriteString( m_BroadcastInfo.m_WorkerExeFilename ); // Write the machine name of the master into the command line. It's ignored by the code, but it's useful // if a job crashes the workers - by looking at the command line in vmpi_service, you can see who ran the job. buf.WriteString( "-mpi_MasterName" ); buf.WriteString( VMPI_GetLocalMachineName() ); for ( int i=1; i < m_BroadcastInfo.m_Args.Count(); i++ ) buf.WriteString( m_BroadcastInfo.m_Args[i] ); buf.WriteByte( (unsigned char)m_BroadcastInfo.m_bForcePatch ); buf.WriteShort( m_iDownloaderListenPort ); // Tell the port that we're listening for downloaders on. } bool CMasterBroadcaster::Update() { CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); connectionsLock.Lock(); // Don't accept any more connections when we've hit the limit. int nActiveConnections, nServiceDownloaders; CountActiveConnections( &nActiveConnections, &nServiceDownloaders ); if ( nActiveConnections >= m_nMaxWorkers ) return false; // Only broadcast our presence so often. if ( m_pSocket ) { DWORD curTime = Plat_MSTime(); if ( curTime - m_LastSendTime >= MASTER_BROADCAST_INTERVAL ) { char packetData[512]; bf_write packetBuf( "packetBuf", packetData, sizeof( packetData ) ); BuildBroadcastPacket( packetBuf ); for ( int iBroadcastPort=VMPI_SERVICE_PORT; iBroadcastPort <= VMPI_LAST_SERVICE_PORT; iBroadcastPort++ ) { if ( m_bPatching ) { // Only send to this specific list of workers if necessary. for ( int i=0; i < m_PatchWorkerIPs.Count(); i++ ) { CIPAddr addr = m_PatchWorkerIPs[i]; addr.port = iBroadcastPort; m_pSocket->SendTo( &addr, packetBuf.GetBasePointer(), packetBuf.GetNumBytesWritten() ); } } else { m_pSocket->Broadcast( packetBuf.GetBasePointer(), packetBuf.GetNumBytesWritten(), iBroadcastPort ); } } // We don't want them to keep patching over and over. if ( m_PatchWorkerIPs.Count() > 0 && m_BroadcastInfo.m_bForcePatch ) m_PatchWorkerIPs.Purge(); m_LastSendTime = curTime; } } // First look for normal workers. IThreadedTCPSocket *pNewConn = NULL; bool bRet = m_pListenSocket->Update( &pNewConn, 0 ); // Now look for downloaders. if ( !bRet || !pNewConn ) { if ( m_pDownloaderListenSocket ) { int nDownloadersAllowed = (m_nMaxWorkers - nActiveConnections) + 8; // Don't allow too many downloaders. if ( nServiceDownloaders < nDownloadersAllowed ) bRet = m_pDownloaderListenSocket->Update( &pNewConn, 0 ); } } if ( bRet && pNewConn ) { // Mark this guy as a downloader if necessary. CIPAddr remoteAddr = pNewConn->GetRemoteAddr(); if ( remoteAddr.port >= VMPI_SERVICE_DOWNLOADER_PORT_FIRST && remoteAddr.port <= VMPI_SERVICE_DOWNLOADER_PORT_LAST ) { CVMPIConnection *pVMPIConnection = FindConnectionBySocket( pNewConn, false ); if ( pVMPIConnection ) pVMPIConnection->m_bIsAService = true; } // Send this guy all the persistent packets. CCriticalSectionLock csLock( &g_PersistentPacketsCS ); csLock.Lock(); FOR_EACH_LL( g_PersistentPackets, iPacket ) { PersistentPacket *pPacket = g_PersistentPackets[iPacket]; VMPI_SendData( pPacket->Base(), pPacket->Count(), g_nConnections-1 ); } UpdateActiveConnectionsText(); return true; } else { return false; } } void CMasterBroadcaster::ThreadFn() { // Update every 100ms or until the main thread tells us to go away. while ( WaitForSingleObject( m_hShutdownEvent.GetEventHandle(), 20 ) == WAIT_TIMEOUT ) { DWORD startTime = GetTickCount(); while ( Update() && (GetTickCount() - startTime) < 500 ) { } } m_hShutdownReply.SetEvent(); } DWORD CMasterBroadcaster::StaticThreadFn( LPVOID lpParameter ) { ((CMasterBroadcaster*)lpParameter)->ThreadFn(); return 0; } void CMasterBroadcaster::Term() { // Shutdown the update thread. if ( m_hThread ) { m_hShutdownEvent.SetEvent(); WaitForSingleObject( m_hThread, INFINITE ); CloseHandle( m_hThread ); m_hThread = 0; } if ( m_pSocket ) { m_pSocket->Release(); m_pSocket = NULL; } if ( m_pListenSocket ) { m_pListenSocket->Release(); m_pListenSocket = NULL; } if ( m_pDownloaderListenSocket ) { m_pDownloaderListenSocket->Release(); m_pDownloaderListenSocket = NULL; } m_iListenPort = -1; m_iDownloaderListenPort = -1; } int CMasterBroadcaster::GetListenPort() const { return m_iListenPort; } int CMasterBroadcaster::GetMaxWorkers() const { return m_nMaxWorkers; } void CMasterBroadcaster::IncreaseMaxWorkers( int count ) { CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); connectionsLock.Lock(); m_nMaxWorkers = min( MAX_VMPI_CONNECTIONS, m_nMaxWorkers + count ); } void CMasterBroadcaster::SetPassword( const char *pPassword ) { CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); connectionsLock.Lock(); Q_strncpy( m_BroadcastInfo.m_Password, pPassword, sizeof( m_BroadcastInfo.m_Password ) ); } void CMasterBroadcaster::SetNoTimeoutOption() { CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); connectionsLock.Lock(); // Don't re-add the option if it's already there. for ( int i=1; i < m_BroadcastInfo.m_Args.Count(); i++ ) { if ( Q_stricmp( m_BroadcastInfo.m_Args[i], VMPI_GetParamString( mpi_NoTimeout ) ) == 0 ) return; } m_BroadcastInfo.m_Args.InsertAfter( 0, (char*)VMPI_GetParamString( mpi_NoTimeout ) ); } CMasterBroadcaster g_MasterBroadcaster; // ---------------------------------------------------------------------------------------- // // CDispatchReg. // ---------------------------------------------------------------------------------------- // CDispatchReg::CDispatchReg( int iPacketID, VMPIDispatchFn fn ) { Assert( iPacketID >= 0 && iPacketID < MAX_VMPI_PACKET_IDS ); Assert( !g_VMPIDispatch[iPacketID] ); g_VMPIDispatch[iPacketID] = fn; } void VMPI_HandleTimingWait_Worker() { if ( VMPI_IsParamUsed( mpi_TimingWait ) ) { Msg( "-mpi_TimingWait specified. Waiting for master to start..." ); // Wait for the signal to go. while ( !g_bTimingWaitDone ) { VMPI_DispatchNextMessage( 50 ); } Msg( "\n "); } } void VMPI_HandleTimingWait_Master() { if ( VMPI_IsParamUsed( mpi_TimingWait ) ) { Msg( "-mpi_TimingWait specified. Waiting for a keypress to continue... " ); getch(); Msg( "\n" ); unsigned char cPacket[2] = { VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_TIMING_WAIT_DONE }; VMPI_SendData( cPacket, sizeof( cPacket ), VMPI_PERSISTENT ); } } // ---------------------------------------------------------------------------------------- // // Helpers. // ---------------------------------------------------------------------------------------- // bool MPI_Init_Worker( int &argc, char **&argv, const CIPAddr &masterAddr, bool bConnectingAsService ) { g_bMPIMaster = false; // Make a connector to try connect to the master. CVMPIConnectionCreator connectionCreator; int iFirstPort = VMPI_WORKER_PORT_FIRST; int iLastPort = VMPI_WORKER_PORT_LAST; if ( bConnectingAsService ) { iFirstPort = VMPI_SERVICE_DOWNLOADER_PORT_FIRST; iLastPort = VMPI_SERVICE_DOWNLOADER_PORT_LAST; } // Now wait for a connection. int nAttempts = 1; Retry:; ITCPConnectSocket *pConnectSocket = NULL; int iPort; for ( iPort=iFirstPort; iPort <= iLastPort; iPort++ ) { pConnectSocket = ThreadedTCP_CreateConnector( masterAddr, CIPAddr( 0, 0, 0, 0, iPort ), &connectionCreator ); if ( pConnectSocket ) break; } if ( !pConnectSocket ) { Error( "Can't bind a port in range [%d, %d].", iFirstPort, iLastPort ); } CWaitTimer wait( 3 ); while ( 1 ) { IThreadedTCPSocket *pSocket = NULL; if ( pConnectSocket->Update( &pSocket, 100 ) ) { if ( pSocket ) { // Send the master our machine name. VMPI_SendMachineNameTo( VMPI_MASTER_ID ); // Verify that the exe is correct. VMPI_ReceiveExeName(); if ( g_bVMPISDKMode ) { VMPI_ReceiveCommandLine(); CommandLine()->CreateCmdLine( g_WorkerCommandLine.Count(), g_WorkerCommandLine.Base() ); argc = g_WorkerCommandLine.Count(); argv = g_WorkerCommandLine.Base(); } ParseOptions( g_WorkerCommandLine.Count(), g_WorkerCommandLine.Base() ); for ( int i=0; i < g_WorkerCommandLine.Count(); i++ ) { Msg( "arg %d: %s\n", i, g_WorkerCommandLine[i] ); } VMPI_HandleTimingWait_Worker(); return true; } } else { pConnectSocket->Release(); Error( "ITCPConnectSocket::Update() errored out" ); } if( wait.ShouldKeepWaiting() ) Sleep( 100 ); else break; }; // Never made a connection, shucks. pConnectSocket->Release(); if ( VMPI_IsParamUsed( mpi_Retry ) ) { Msg( "%s found. Retrying connection to %d.%d.%d.%d:%d (attempt %d).\n", VMPI_GetParamString( mpi_Retry ), masterAddr.ip[0], masterAddr.ip[1], masterAddr.ip[2], masterAddr.ip[3], masterAddr.port, nAttempts++ ); goto Retry; } return false; } bool SpawnLocalWorker( int argc, char **argv, int iListenPort, bool bShowConsoleWindow ) { char commandLine[4096]; commandLine[0] = 0; // Add the -mpi_worker argument in, then launch the process. for ( int i=0; i < 9999999; i++ ) { char argStr[512]; if ( i == 1 ) { Q_snprintf( argStr, sizeof( argStr ), "-mpi_worker 127.0.0.1:%d ", iListenPort ); Q_strncat( commandLine, argStr, sizeof( commandLine ), COPY_ALL_CHARACTERS ); Q_strncat( commandLine, "-allowdebug ", sizeof( commandLine ), COPY_ALL_CHARACTERS ); // Add -mpi_SDKMode if it's needed. This would mostly only occur in a debugging situation // (someone running out of rel using -mpi_AutoLocalWorker). if ( VMPI_IsSDKMode() && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_SDKMode ), "" ) ) { Q_strncat( commandLine, VMPI_GetParamString( mpi_SDKMode ), sizeof( commandLine ), COPY_ALL_CHARACTERS ); } } if ( i >= argc ) break; Q_snprintf( argStr, sizeof( argStr ), "\"%s\" ", argv[i] ); Q_strncat( commandLine, argStr, sizeof( commandLine ), COPY_ALL_CHARACTERS ); } char workingDir[1024]; if ( !_getcwd( workingDir, sizeof( workingDir ) ) ) { Warning( "_getcwd() failed.\n" ); return false; } STARTUPINFO si; memset( &si, 0, sizeof( si ) ); si.cb = sizeof( si ); PROCESS_INFORMATION pi; memset( &pi, 0, sizeof( pi ) ); if ( CreateProcess( NULL, commandLine, NULL, // security NULL, TRUE, (bShowConsoleWindow ? CREATE_NEW_CONSOLE : CREATE_NO_WINDOW) | IDLE_PRIORITY_CLASS, // flags NULL, // environment workingDir, // current directory (use c:\\ because we don't want it to accidentally share // DLLs like vstdlib with us). &si, &pi ) ) { return true; } else { char errStr[1024]; IP_GetLastErrorString( errStr, sizeof( errStr ) ); Warning( " - ERROR in CreateProcess (%s)!\n", errStr ); return false; } } bool InitMaster( int argc, char **argv, const char *pDependencyFilename, VMPIRunMode runMode, bool bPatchMode ) { int nMaxWorkers = -1; const char *pProcCount = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_WorkerCount ) ); if ( pProcCount ) { nMaxWorkers = atoi( pProcCount ); Warning( "%s: waiting for %d processes to join.\n", VMPI_GetParamString( mpi_WorkerCount ), nMaxWorkers ); } else { nMaxWorkers = DEFAULT_MAX_WORKERS; } nMaxWorkers = clamp( nMaxWorkers, 2, MAX_VMPI_CONNECTIONS ); g_bMPIMaster = true; g_nMaxWorkerCount = nMaxWorkers; if ( argc <= 0 ) Error( "MPI_Init_Master: argc <= 0!" ); ParseOptions( argc, argv ); // Send the base filename of the exe we're running. Sometimes if we run vvis followed by vrad // really quickly, the old vvis workers can connect to the vrad process and mess with it. VMPI_SendExeName(); // In SDK mode, the master sends the command line to the workers since // the workers weren't given a full command line by vmpi_service. if ( VMPI_IsSDKMode() ) { VMPI_SendCommandLine( argc, argv ); } if ( !g_MasterBroadcaster.Init( argc, argv, pDependencyFilename, nMaxWorkers, runMode, bPatchMode ) ) return false; bool bRet; if ( runMode == VMPI_RUN_LOCAL ) { bRet = SpawnLocalWorker( argc, argv, g_MasterBroadcaster.GetListenPort(), false ); } else { if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_AutoLocalWorker ), "" ) ) { Msg( "%s found. Spawning a local worker automatically.\n", VMPI_GetParamString( mpi_AutoLocalWorker ) ); SpawnLocalWorker( 1, argv, g_MasterBroadcaster.GetListenPort(), true ); } bRet = true; } VMPI_HandleTimingWait_Master(); return bRet; } void VMPI_InitGlobals( int argc, char **argv, VMPIRunMode runMode ) { g_bUseMPI = true; g_VMPIRunMode = runMode; // Init event objects. g_VMPIMessagesEvent.Init( false, false ); g_ErrorSocketsEvent.Init( false, false ); // Load this for GetConsoleWindow(). g_hKernel32DLL = LoadLibrary( "kernel32.dll" ); if ( g_hKernel32DLL ) { g_pConsoleWndFn = (GetConsoleWndFn)GetProcAddress( g_hKernel32DLL, "GetConsoleWindow" ); } #if defined( _DEBUG ) for ( int iArg=0; iArg < argc; iArg++ ) { Warning( "%s\n", argv[iArg] ); } Warning( "\n" ); #endif } bool VMPI_CheckForNonSDKExecutables() { char baseExeFilename[512]; if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) Error( "VMPI_CheckSDKMode -> GetModuleFileName failed." ); V_StripLastDir( baseExeFilename, sizeof( baseExeFilename ) ); V_AppendSlash( baseExeFilename, sizeof( baseExeFilename ) ); V_strncat( baseExeFilename, "mysql_wrapper.dll", sizeof( baseExeFilename ) ); // If vmpi_transfer.exe doesn't exist, then we assume we're in SDK mode. return ( _access( baseExeFilename, 0 ) == 0 ); } bool IsValidSDKBinPath( CUtlVector< char* > &outStrings, int *pError ) { *pError = 0; // Minimum must have drive:/basedir/steamapps/name/sourcesdk/bin/[ep1|orangebox]/bin/exename if ( outStrings.Count() < 9 ) { *pError = 0; return false; } if ( V_stricmp( outStrings[outStrings.Count()-2], "bin" ) != 0 ) { *pError = 1; return false; } if ( V_stricmp( outStrings[outStrings.Count()-5], "sourcesdk" ) != 0 ) { *pError = 2; return false; } if ( V_stricmp( outStrings[outStrings.Count()-7], "steamapps" ) != 0 ) { *pError = 3; return false; } // Check the last-access date on clientregistry.blob char baseSteamPath[MAX_PATH]; V_strncpy( baseSteamPath, outStrings[0], sizeof( baseSteamPath) ); for ( int i=1; i < outStrings.Count() - 7; i++ ) { V_AppendSlash( baseSteamPath, sizeof( baseSteamPath ) ); V_strncat( baseSteamPath, outStrings[i], sizeof( baseSteamPath ) ); } char blobPath[MAX_PATH]; V_ComposeFileName( baseSteamPath, "ClientRegistry.blob", blobPath, sizeof( blobPath ) ); struct _stat results; if ( _stat( blobPath, &results ) != 0 ) { *pError = 4; return false; } long curTime; VCRHook_Time( &curTime ); int nSecondsSinceLastSteamAccess = curTime - results.st_mtime; int nSecondsPerDay = 60 * 60 * 24; int nMaxDaysUnaccessed = 10; if ( nSecondsSinceLastSteamAccess > nSecondsPerDay*nMaxDaysUnaccessed ) { *pError = 5; // NOTE: don't change this error code because the outer function checks for it. return false; } // Check for some of the files under sourcesdk_content. char sourcesdkContentPath[MAX_PATH]; V_strncpy( sourcesdkContentPath, outStrings[0], sizeof( sourcesdkContentPath ) ); for ( int i=1; i < outStrings.Count() - 5; i++ ) { V_AppendSlash( sourcesdkContentPath, sizeof( sourcesdkContentPath ) ); V_strncat( sourcesdkContentPath, outStrings[i], sizeof( sourcesdkContentPath ) ); } V_AppendSlash( sourcesdkContentPath, sizeof( sourcesdkContentPath ) ); V_strncat( sourcesdkContentPath, "sourcesdk_content", sizeof( sourcesdkContentPath ) ); char tempFilename[MAX_PATH], mapsrcFilename[MAX_PATH]; V_snprintf( tempFilename, sizeof( tempFilename ), "cstrike%cmapsrc", CORRECT_PATH_SEPARATOR ); V_ComposeFileName( sourcesdkContentPath, tempFilename, mapsrcFilename, sizeof( mapsrcFilename ) ); if ( _access( mapsrcFilename, 0 ) != 0 ) { *pError = 6; return false; } return true; } void VerifyValidSDKMode() { // Make sure we're running out of the SourceSDK directory and that our SDK directories are filled out. char baseExeFilename[MAX_PATH]; if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) Error( "VerifyValidSDKMode: GetModuleFileName failed." ); V_FixSlashes( baseExeFilename ); CUtlVector< char* > outStrings; char strSlash[2] = {CORRECT_PATH_SEPARATOR, 0}; V_SplitString( baseExeFilename, strSlash, outStrings ); int err; if ( !IsValidSDKBinPath( outStrings, &err ) ) { outStrings.PurgeAndDeleteElements(); if ( err == 5 ) Error( "VMPI running in SDK mode but Steam hasn't been run recently. Please run Steam and retry." ); else Error( "VMPI running in SDK mode but incorrect SDK install detected (error %d).", err ); } } void VMPI_CheckSDKMode( int argc, char **argv ) { g_bVMPISDKMode = !VMPI_CheckForNonSDKExecutables(); g_bVMPISDKModeSet = true; // Also check for -mpi_sdkmode (only used in testing). if ( !g_bVMPISDKMode ) { if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_SDKMode ), "" ) ) g_bVMPISDKMode = true; } if ( g_bVMPISDKMode ) { VerifyValidSDKMode(); } if ( g_bVMPISDKMode ) { Msg( "VMPI running in SDK mode.\n" ); } } void VMPI_SetupAutoRestartParameters( int argc, char **argv ) { if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_AutoRestart ) ) ) { g_OriginalCommandLineParameters.SetSize( argc ); for ( int i=0; i < argc; i++ ) { g_OriginalCommandLineParameters[i] = CopyString( argv[i] ); } } } bool VMPI_HandleAutoRestart() { if ( g_OriginalCommandLineParameters.Count() == 0 ) return true; Msg( "%s found. Auto-restarting.\n", VMPI_GetParamString( mpi_AutoRestart ) ); DWORD curPriority = GetPriorityClass( GetCurrentProcess() ); char commandLine[1024*8]; commandLine[0] = 0; // Add the -mpi_worker argument in, then launch the process. for ( int i=0; i < g_OriginalCommandLineParameters.Count(); i++ ) { char argStr[512]; Q_snprintf( argStr, sizeof( argStr ), "\"%s\" ", g_OriginalCommandLineParameters[i] ); Q_strncat( commandLine, argStr, sizeof( commandLine ), COPY_ALL_CHARACTERS ); } STARTUPINFO si; memset( &si, 0, sizeof( si ) ); si.cb = sizeof( si ); PROCESS_INFORMATION pi; memset( &pi, 0, sizeof( pi ) ); if ( CreateProcess( NULL, commandLine, NULL, // security NULL, TRUE, CREATE_NEW_CONSOLE | curPriority, // flags NULL, // environment NULL, &si, &pi ) ) { g_OriginalCommandLineParameters.Purge(); return true; } else { char errStr[1024]; IP_GetLastErrorString( errStr, sizeof( errStr ) ); Warning( " - ERROR in CreateProcess (%s)!\n", errStr ); return false; } } bool VMPI_Init( int &argc, char **&argv, const char *pDependencyFilename, VMPI_Disconnect_Handler handler, VMPIRunMode runMode, bool bConnectingAsService ) { if ( handler ) VMPI_AddDisconnectHandler( handler ); VMPI_SetupAutoRestartParameters( argc, argv ); VMPI_CheckSDKMode( argc, argv ); VMPI_InitGlobals( argc, argv, runMode ); // Were we launched by the vmpi service as a worker? const char *pMasterIP = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), NULL ); if ( pMasterIP ) { CIPAddr addr; addr.port = VMPI_MASTER_FIRST_PORT; if ( !ConvertStringToIPAddr( pMasterIP, &addr ) ) Error( "Unable to parse or resolve master IP (%s).\n", pMasterIP ); return MPI_Init_Worker( argc, argv, addr, bConnectingAsService ); } else { if ( !pDependencyFilename ) { Error( "VMPI started as master, but no dependency filename specified.\n" ); return false; } return InitMaster( argc, argv, pDependencyFilename, runMode, false ); } } void VMPI_Init_PatchMaster( int argc, char **argv ) { const char *pPatchDirectory = VMPI_FindArg( argc, argv, "-mpi_PatchDirectory", NULL ); if ( !pPatchDirectory ) Error( "-mpi_PatchDirectory must be specified if using -PatchHost mode." ); VMPI_InitGlobals( argc, argv, VMPI_RUN_NETWORKED ); InitMaster( argc, argv, pPatchDirectory, VMPI_RUN_NETWORKED, true ); } void VMPI_Finalize() { g_MasterBroadcaster.Term(); DistributeWork_Cancel(); // Get rid of all the sockets. for ( int iConn=0; iConn < g_nConnections; iConn++ ) delete g_Connections[iConn]; g_nConnections = 0; // Get rid of all the packets. FOR_EACH_LL( g_VMPIMessages, i ) { g_VMPIMessages[i]->Release(); } g_VMPIMessages.Purge(); g_PersistentPackets.PurgeAndDeleteElements(); // Get rid of the message buffers g_DispatchBuffers.Purge(); if ( g_hKernel32DLL ) { FreeLibrary( g_hKernel32DLL ); g_hKernel32DLL = NULL; } g_WorkerCommandLine.PurgeAndDeleteElements(); VMPI_HandleAutoRestart(); } VMPIRunMode VMPI_GetRunMode() { return g_VMPIRunMode; } VMPIFileSystemMode VMPI_GetFileSystemMode() { return g_VMPIFileSystemMode; } int VMPI_GetCurrentNumberOfConnections() { return g_nConnections; } void InternalHandleSocketErrors() { // Copy the list of sockets with errors into a local array so we can handle all the errors outside // the mutex, thus avoiding potential deadlock if any error handlers call Error(). CUtlVector errorSockets; CCriticalSectionLock csLock( &g_ErrorSocketsCS ); csLock.Lock(); errorSockets.SetSize( g_ErrorSockets.Count() ); int iCur = 0; FOR_EACH_LL( g_ErrorSockets, i ) { errorSockets[iCur++] = g_ErrorSockets[i]; } g_ErrorSockets.Purge(); csLock.Unlock(); // Handle the errors. for ( int i=0; i < errorSockets.Count(); i++ ) { errorSockets[i]->HandleDisconnect(); } UpdateActiveConnectionsText(); } void VMPI_HandleSocketErrors( unsigned long timeout ) { DWORD ret = WaitForSingleObject( g_ErrorSocketsEvent.GetEventHandle(), timeout ); if ( ret == WAIT_OBJECT_0 ) { InternalHandleSocketErrors(); } } // If bWait is false, then this function returns false immediately if there are no messages waiting. bool VMPI_GetNextMessage( MessageBuffer *pBuf, int *pSource, unsigned long startTimeout ) { HANDLE handles[2] = { g_ErrorSocketsEvent.GetEventHandle(), g_VMPIMessagesEvent.GetEventHandle() }; DWORD startTime = Plat_MSTime(); DWORD timeout = startTimeout; while ( 1 ) { DWORD ret = WaitForMultipleObjects( ARRAYSIZE( handles ), handles, FALSE, timeout ); if ( ret == WAIT_TIMEOUT ) { return false; } else if ( ret == WAIT_OBJECT_0 ) { // A socket had an error. Handle all socket errors. InternalHandleSocketErrors(); // Update the timeout. DWORD delta = Plat_MSTime() - startTime; if ( delta >= startTimeout ) return false; timeout = startTimeout - delta; continue; } else if ( ret == (WAIT_OBJECT_0 + 1) ) { // Read out the next message. CCriticalSectionLock csLock( &g_VMPIMessagesCS ); csLock.Lock(); GrabNextMessage:; int iHead = g_VMPIMessages.Head(); CTCPPacket *pPacket = g_VMPIMessages[iHead]; g_VMPIMessages.Remove( iHead ); // Set the event again if there are more messages waiting. const char *pBase = pPacket->GetData(); if ( pPacket->GetLen() >= 6 && (unsigned char)pBase[0] == VMPI_INTERNAL_PACKET_ID && (unsigned char)pBase[1] == VMPI_INTERNAL_SUBPACKET_GROUPED_PACKET ) { // Ok, this is a grouped packet. Split it out into a bunch of separate packets. CUtlVector groupedPackets; int iCurOffset = 2; while ( (iCurOffset+4) <= pPacket->GetLen() ) { int curPacketLen = *((int*)&pBase[iCurOffset]); if ( iCurOffset + curPacketLen > pPacket->GetLen() ) Error( "Invalid chunked packet\n" ); iCurOffset += 4; CTCPPacket *pChunkPacket = (CTCPPacket*)malloc( sizeof( CTCPPacket ) + curPacketLen - 1 ); pChunkPacket->m_Len = curPacketLen; pChunkPacket->m_UserData = pPacket->m_UserData; memcpy( pChunkPacket->m_Data, &pBase[iCurOffset], curPacketLen ); groupedPackets.AddToTail( pChunkPacket ); iCurOffset += curPacketLen; } for ( int i=0; i < groupedPackets.Count(); i++ ) { g_VMPIMessages.AddToHead( groupedPackets[groupedPackets.Count() - i - 1] ); } pPacket->Release(); goto GrabNextMessage; } else { if ( g_VMPIMessages.Count() > 0 ) g_VMPIMessagesEvent.SetEvent(); } csLock.Unlock(); // Copy it into their message buffer. pBuf->setLen( pPacket->GetLen() ); memcpy( pBuf->data, pPacket->GetData(), pPacket->GetLen() ); *pSource = pPacket->GetUserData(); Assert( *pSource >= 0 && *pSource < g_nConnections ); // Update global stats about how much data we've received. ++g_nMessagesReceived; g_nBytesReceived += pPacket->GetLen() + 4; // (4 bytes extra for the packet length) // Free the memory associated with the packet. pPacket->Release(); return true; } else { Error( "VMPI_GetNextMessage: WaitForSingleObject returned %lu", ret ); return false; } } } bool VMPI_InternalDispatch( MessageBuffer *pBuf, int iSource ) { if ( pBuf->getLen() >= 1 && pBuf->data[0] >= 0 && pBuf->data[0] < MAX_VMPI_PACKET_IDS && g_VMPIDispatch[pBuf->data[0]] ) { return g_VMPIDispatch[ pBuf->data[0] ]( pBuf, iSource, pBuf->data[0] ); } else { return false; } } bool VMPI_DispatchNextMessage( unsigned long timeout ) { MessageBuffer *pBuf = NULL; if ( !g_DispatchBuffers.PopItem( &pBuf ) ) { pBuf = new MessageBuffer(); } bool bRetval = true; while ( 1 ) { int iSource; if ( VMPI_GetNextMessage( pBuf, &iSource, timeout ) ) { if ( VMPI_InternalDispatch( pBuf, iSource ) ) { break; } else { // Workers running in service mode don't hook anything except filesystem stuff, so if they happen to be sent something, no problem. if ( !VMPI_IsProcAService( iSource ) ) { // Oops! What is this packet? Assert( false ); } } } else { bRetval = false; break; } } g_DispatchBuffers.PushItem( pBuf ); return bRetval; } bool VMPI_DispatchUntil( MessageBuffer *pBuf, int *pSource, int packetID, int subPacketID, bool bWait ) { while ( 1 ) { if ( !VMPI_GetNextMessage( pBuf, pSource, bWait ? VMPI_TIMEOUT_INFINITE : 0 ) ) return false; if ( !VMPI_InternalDispatch( pBuf, *pSource ) ) { if ( pBuf->getLen() >= 1 && (unsigned char)pBuf->data[0] == packetID ) { if ( subPacketID == -1 ) return true; if ( pBuf->getLen() >= 2 && (unsigned char)pBuf->data[1] == subPacketID ) return true; } // Oops! What is this packet? // Note: the most common case where this happens is if it finishes a BuildFaceLights run // and is in an AppBarrier and one of the workers is still finishing up some work given to it. // It'll be waiting for a barrier packet, and it'll get results. In that case, the packet should // be discarded like we do here, so maybe this assert won't be necessary. //Assert( false ); } } } bool VMPI_SendData( void *pData, int nBytes, int iDest, int fVMPISendFlags ) { return VMPI_SendChunks( &pData, &nBytes, 1, iDest, fVMPISendFlags ); } inline bool VMPI_FilterPacketsForServiceDownloader( CVMPIConnection *pConnection, void const * const *pChunks, const int *pChunkLengths, int nChunks ) { if ( pConnection->m_bIsAService ) { // Find the first byte and treat that as the packet ID. for ( int i=0; i < nChunks; i++ ) { if ( pChunkLengths[i] > 0 ) { unsigned char cPacketID = *((unsigned char*)pChunks[i]); if ( cPacketID == VMPI_INTERNAL_PACKET_ID || cPacketID == VMPI_SHARED_PACKET_ID || cPacketID == VMPI_PACKETID_FILESYSTEM ) return false; else return true; } } } return false; } void VMPI_GroupPackets( CVMPIConnection *pConn, void const * const *pChunks, const int *pChunkLengths, int nChunks ) { CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); connectionsLock.Lock(); // First add the header. if ( pConn->m_GroupedChunks.Count() == 0 ) { pConn->m_GroupedChunks.AddToTail( g_GroupedPacketHeader ); pConn->m_GroupedChunkLengths.AddToTail( sizeof( g_GroupedPacketHeader ) ); } // Collate the chunks. int nTotalLength = 0; for ( int i=0; i < nChunks; i++ ) nTotalLength += pChunkLengths[i]; char *pOut = new char[nTotalLength + 4]; *((int*)pOut) = nTotalLength; int iOutByte = 4; for ( int i=0; i < nChunks; i++ ) { memcpy( &pOut[iOutByte], pChunks[i], pChunkLengths[i] ); iOutByte += pChunkLengths[i]; } pConn->m_GroupedChunks.AddToTail( pOut ); pConn->m_GroupedChunkLengths.AddToTail( nTotalLength + 4 ); } void VMPI_FlushGroupedPackets( unsigned long msInterval ) { if ( msInterval != 0 ) { unsigned long curTime = Plat_MSTime(); if ( curTime - g_LastFlushGroupedPacketsTime < msInterval ) return; g_LastFlushGroupedPacketsTime = curTime; } CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); connectionsLock.Lock(); for ( int i=0; i < g_nConnections; i++ ) { CVMPIConnection *pConn = g_Connections[i]; if ( !pConn ) continue; IThreadedTCPSocket *pSocket = pConn->GetSocket(); if ( !pSocket || pConn->m_GroupedChunks.Count() == 0 ) continue; pSocket->SendChunks( pConn->m_GroupedChunks.Base(), pConn->m_GroupedChunkLengths.Base(), pConn->m_GroupedChunks.Count() ); // Free the chunks. for ( int i=1; i < pConn->m_GroupedChunks.Count(); i++ ) { free( pConn->m_GroupedChunks[i] ); } pConn->m_GroupedChunks.RemoveAll(); pConn->m_GroupedChunkLengths.RemoveAll(); } } bool VMPI_SendChunks( void const * const *pChunks, const int *pChunkLengths, int nChunks, int iDest, int fVMPISendFlags ) { if ( iDest == VMPI_SEND_TO_ALL ) { // Don't want new connections while in here! CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); connectionsLock.Lock(); for ( int i=0; i < g_nConnections; i++ ) VMPI_SendChunks( pChunks, pChunkLengths, nChunks, i ); return true; } else if ( iDest == VMPI_PERSISTENT ) { // Don't want new connections while in here! CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); connectionsLock.Lock(); CCriticalSectionLock csLock( &g_PersistentPacketsCS ); csLock.Lock(); // Send the packet to everyone. for ( int i=0; i < g_nConnections; i++ ) VMPI_SendChunks( pChunks, pChunkLengths, nChunks, i ); // Remember to send it to the new workers. if ( iDest == VMPI_PERSISTENT ) { PersistentPacket *pNew = new PersistentPacket; for ( int i=0; i < nChunks; i++ ) pNew->AddMultipleToTail( pChunkLengths[i], (const char*)pChunks[i] ); g_PersistentPackets.AddToTail( pNew ); } return true; } else { g_nMessagesSent++; g_nBytesSent += 4; // for message tag. for ( int i=0; i < nChunks; i++ ) g_nBytesSent += pChunkLengths[i]; CVMPIConnection *pConnection = g_Connections[iDest]; if ( pConnection ) { // If it's a service downloader, only send certain packet IDs. if ( VMPI_FilterPacketsForServiceDownloader( pConnection, pChunks, pChunkLengths, nChunks ) ) return true; IThreadedTCPSocket *pSocket = pConnection->GetSocket(); if ( !pSocket ) return false; if ( g_bGroupPackets && (fVMPISendFlags & k_eVMPISendFlags_GroupPackets) ) { VMPI_GroupPackets( pConnection, pChunks, pChunkLengths, nChunks ); return true; } else { return pSocket->SendChunks( pChunks, pChunkLengths, nChunks ); } } else { return false; } } } bool VMPI_Send2Chunks( const void *pChunk1, int chunk1Len, const void *pChunk2, int chunk2Len, int iDest, int fVMPISendFlags ) { const void *pChunks[2] = { pChunk1, pChunk2 }; int len[2] = { chunk1Len, chunk2Len }; return VMPI_SendChunks( pChunks, len, ARRAYSIZE( pChunks ), iDest, fVMPISendFlags ); } bool VMPI_Send3Chunks( const void *pChunk1, int chunk1Len, const void *pChunk2, int chunk2Len, const void *pChunk3, int chunk3Len, int iDest, int fVMPISendFlags ) { const void *pChunks[3] = { pChunk1, pChunk2, pChunk3 }; int len[3] = { chunk1Len, chunk2Len, chunk3Len }; return VMPI_SendChunks( pChunks, len, ARRAYSIZE( pChunks ), iDest, fVMPISendFlags ); } void VMPI_AddDisconnectHandler( VMPI_Disconnect_Handler handler ) { g_DisconnectHandlers.AddToTail( handler ); } CVMPIConnection* GetConnection( int procID ) { Assert( procID >= 0 && procID < g_nConnections ); return g_Connections[procID]; } bool VMPI_IsProcConnected( int procID ) { if ( procID < 0 || procID >= g_nConnections ) { Assert( false ); return false; } return g_Connections[procID]->GetSocket() != NULL; } bool VMPI_IsProcAService( int procID ) { if ( procID < 0 || procID >= g_nConnections ) { Assert( false ); return false; } return g_Connections[procID]->m_bIsAService; } void VMPI_Sleep( unsigned long ms ) { Sleep( ms ); } const char* VMPI_GetMachineName( int iProc ) { if ( g_bMPIMaster && iProc == VMPI_MASTER_ID ) return VMPI_GetLocalMachineName(); if ( iProc < 0 || iProc >= g_nConnections ) { Assert( false ); return "invalid index"; } return g_Connections[iProc]->GetMachineName(); } void VMPI_SetMachineName( int iProc, const char *pName ) { if ( iProc < 0 || iProc >= g_nConnections ) { Assert( false ); return; } g_Connections[iProc]->SetMachineName( pName ); } bool VMPI_HasMachineNameBeenSet( int iProc ) { if ( iProc < 0 || iProc >= g_nConnections ) { Assert( false ); return false; } return g_Connections[iProc]->HasMachineNameBeenSet(); } const char* VMPI_GetLocalMachineName() { static char cName[MAX_COMPUTERNAME_LENGTH+1]; DWORD len = sizeof( cName ); if ( GetComputerName( cName, &len ) ) return cName; else return "(error in GetComputerName)"; } unsigned long VMPI_GetJobWorkerID( int iProc ) { return GetConnection( iProc )->m_JobWorkerID; } void VMPI_SetJobWorkerID( int iProc, unsigned long jobWorkerID ) { GetConnection( iProc )->m_JobWorkerID = jobWorkerID; } void VMPI_GetCurrentStage( char *pOut, int strLen ) { CCriticalSectionLock csLock( &g_CurrentStageCS ); csLock.Lock(); Q_strncpy( pOut, g_CurrentStageString, strLen ); } void VMPI_SetCurrentStage( const char *pCurStage ) { CCriticalSectionLock csLock( &g_CurrentStageCS ); csLock.Lock(); Q_strncpy( g_CurrentStageString, pCurStage, sizeof( g_CurrentStageString ) ); } void VMPI_InviteDebugWorkers() { // Only allow workers with password set to debugworker. g_MasterBroadcaster.SetPassword( "debugworker" ); // Disable timeouts so they can sit in the debugger. g_MasterBroadcaster.SetNoTimeoutOption(); ThreadedTCP_EnableTimeouts( false ); // Let in some more workers. g_MasterBroadcaster.IncreaseMaxWorkers( 25 ); } bool VMPI_IsSDKMode() { if ( g_bVMPISDKModeSet ) return g_bVMPISDKMode; else return !VMPI_CheckForNonSDKExecutables(); } const char* VMPI_GetParamString( EVMPICmdLineParam eParam ) { if ( eParam <= k_eVMPICmdLineParam_FirstParam || eParam >= k_eVMPICmdLineParam_LastParam ) { Assert( false ); Warning( "Invalid call: VMPI_GetParamString( %d )\n", eParam ); return "unknown"; } else { return g_VMPIParams[eParam].m_pName; } } int VMPI_GetParamFlags( EVMPICmdLineParam eParam ) { if ( eParam <= k_eVMPICmdLineParam_FirstParam || eParam >= k_eVMPICmdLineParam_LastParam ) { Assert( false ); Warning( "Invalid call: VMPI_GetParamString( %d )\n", eParam ); return 0; } else { return g_VMPIParams[eParam].m_ParamFlags; } } bool VMPI_IsParamUsed( EVMPICmdLineParam eParam ) { int iParam = CommandLine()->FindParm( VMPI_GetParamString( eParam ) ); return iParam != 0; } const char* VMPI_GetParamHelpString( EVMPICmdLineParam eParam ) { if ( eParam <= k_eVMPICmdLineParam_FirstParam || eParam >= k_eVMPICmdLineParam_LastParam ) { Assert( false ); Warning( "Invalid call: VMPI_GetParamHelpString( %d )\n", eParam ); return "unknown vmpi param"; } else { return g_VMPIParams[eParam].m_pHelpText; } }