//========= Copyright Valve Corporation, All rights reserved. ============// // //=======================================================================================// #include "../utils/bzip2/bzlib.h" #include "sv_filepublish.h" #include "utlstring.h" #include "strtools.h" #include "sv_replaycontext.h" #include "convar.h" #include "fmtstr.h" #include "compression.h" #include "replay/shared_defs.h" #include "spew.h" #include "utlqueue.h" // memdbgon must be the last include file in a .cpp file!!! #include "tier0/memdbgon.h" //---------------------------------------------------------------------------------------- ConVar replay_publish_simulate_delay_local_http( "replay_publish_simulate_delay_local_http", "0", FCVAR_DONTRECORD, "Simulate a delay (in seconds) when publishing replay data via local HTTP.", true, 0.0f, true, 60.0f ); ConVar replay_publish_simulate_rename_fail( "replay_publish_simulate_rename_fail", "0", FCVAR_DONTRECORD, "Simulate a rename failure during local HTTP publishing, which will force a manual copy & delete.", true, 0.0f, true, 1.0f ); //---------------------------------------------------------------------------------------- CBasePublishJob::CBasePublishJob( JobPriority_t nPriority/*=JP_NORMAL*/, ISpewer *pSpewer/*=g_pDefaultSpewer*/ ) : CBaseJob( nPriority, pSpewer ) { } void CBasePublishJob::SimulateDelay( int nDelay, const char *pThreadName ) { if ( nDelay > 0 ) { Log( "%s thread: Simulating %i sec delay.\n", pThreadName, nDelay ); ThreadSleep( nDelay * 1000 ); Log( "%s thread: simulation done.\n", pThreadName ); } } //---------------------------------------------------------------------------------------- CLocalPublishJob::CLocalPublishJob( const char *pLocalFilename ) { V_strcpy( m_szLocalFilename, pLocalFilename ); } JobStatus_t CLocalPublishJob::DoExecute() { DBG( "Attempting to rename file to local fileserver path..." ); PrintEventStartMsg( "Source file exists?" ); if ( !g_pFullFileSystem->FileExists( m_szLocalFilename ) ) { PrintEventResult( false ); CFmtStr fmtError( "Source file '%s' does not exist", m_szLocalFilename ); SetError( ERROR_SOURCE_FILE_DOES_NOT_EXIST, fmtError.Access() ); return JOB_FAILED; } PrintEventResult( true ); // Make sure the publish path exists const char *pFileserverPath = g_pServerReplayContext->GetLocalFileServerPath(); PrintEventStartMsg( "Checking fileserver path" ); if ( !g_pFullFileSystem->IsDirectory( pFileserverPath ) ) { PrintEventResult( false ); CFmtStr fmtError( "Fileserver path '%s' invalid (see replay_local_fileserver_path)", pFileserverPath ); SetError( ERROR_INVALID_FILESERVER_PATH, fmtError.Access() ); return JOB_FAILED; } PrintEventResult( true ); // Format a path & filename that points to the fileserver's download directory, with .dmx on the end const char *pFilename = V_UnqualifiedFileName( m_szLocalFilename ); CFmtStr fmtPublishFilename( "%s%s", pFileserverPath, pFilename ); const char *pTargetFilename = fmtPublishFilename.Access(); // Delete the destination file if it exists already if ( g_pFullFileSystem->FileExists( pTargetFilename ) ) { PrintEventStartMsg( "Target file exists - deleting" ); g_pFullFileSystem->RemoveFile( pTargetFilename ); // Give the system a bit of time before another check ThreadSleep( 500 ); if ( g_pFullFileSystem->FileExists( pTargetFilename ) ) { #ifdef WIN32 LPVOID pMsgBuf; if ( FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language (LPTSTR) &pMsgBuf, 0, NULL )) { Log( "\n\nError: %s\n", (const char *)pMsgBuf ); LocalFree( pMsgBuf ); } #endif PrintEventResult( false ); CFmtStr fmtError( "Target already existed and could not be removed: '%s'", pTargetFilename ); SetError( ERROR_COULD_NOT_DELETE_TARGET_FILE, fmtError.Access() ); return JOB_FAILED; } PrintEventResult( true ); } // Simulate a delay if necessary SimulateDelay( replay_publish_simulate_delay_local_http.GetInt(), "Local HTTP" ); // Rename the file - RenameFile() still returns true, even if the destination pathname // is nonsense. If the *source* is invalid, it fails as expected, though. Adding a FileExists() // does not help. PrintEventStartMsg( "Renaming to target" ); const bool bSimulateRenameFail = replay_publish_simulate_rename_fail.GetBool(); if ( bSimulateRenameFail || !g_pFullFileSystem->RenameFile( m_szLocalFilename, pTargetFilename ) ) { // Try to explicitly copy to target if ( g_pEngine->CopyFile( m_szLocalFilename, pTargetFilename ) ) { // ...and deletion of source. g_pFullFileSystem->RemoveFile( m_szLocalFilename ); } else { PrintEventResult( false ); CFmtStr fmtError( "Failed to rename '%s' -> '%s'\n", m_szLocalFilename, pTargetFilename ); SetError( ERROR_RENAME_FAILED, fmtError.Access() ); return JOB_FAILED; } } PrintEventResult( true ); DBG( "Rename succeeded.\n" ); return JOB_OK; } //---------------------------------------------------------------------------------------- CLocalPublishJob *SV_CreateLocalPublishJob( const char *pLocalFilename ) { return new CLocalPublishJob( pLocalFilename ); } //---------------------------------------------------------------------------------------- CCompressionJob::CCompressionJob( const uint8 *pSrcData, uint32 nSrcSize, CompressorType_t nType, bool *pOutResult, uint32 *pResultSize ) : m_pSrcData( pSrcData ), m_nSrcSize( nSrcSize ), m_pCompressionResult( pOutResult ), m_pResultSize( pResultSize ) { *m_pCompressionResult = false; *m_pResultSize = 0; m_pCompressor = CreateCompressor( nType ); } JobStatus_t CCompressionJob::DoExecute() { IF_REPLAY_DBG2( Warning( "Attempting to compress...\n" ) ); if ( m_nSrcSize == 0 ) { SetError( ERROR_FAILED_ZERO_LENGTH_DATA, "Compression failed. Zero length data." ); return JOB_FAILED; } int nResult = JOB_FAILED; // Attempt to compress the file const int nMaxCompressedSize = ceil( m_nSrcSize * 1.1f ) + 600; // see "destLen" - http://www.bzip.org/1.0.3/html/util-fns.html uint8 *pCompressed = new uint8[ nMaxCompressedSize ]; // Compress unsigned int nCompressedSize; PrintEventStartMsg( "Compressing" ); if ( !m_pCompressor->Compress( (char *)pCompressed, &nCompressedSize, (const char *)m_pSrcData, m_nSrcSize ) ) { // Compression failed? IF_REPLAY_DBG2( Warning( "Could not compress stream.\n" ) ); PrintEventResult( false ); SetError( ERROR_OK_COULDNOTCOMPRESS ); // Set result to uncompressed buffer and free compressed m_pResult = (uint8 *)m_pSrcData; delete [] pCompressed; *m_pCompressionResult = false; *m_pResultSize = m_nSrcSize; } else { PrintEventResult( true ); // Success! DBG( "Compression succeeded.\n" ); nResult = JOB_OK; // Set result to compressed buffer m_pResult = pCompressed; *m_pResultSize = nCompressedSize; *m_pCompressionResult = true; } // Compression would have been worse than not compressing at all return nResult; } void CCompressionJob::GetOutputData( uint8 **ppData, uint32 *pDataSize ) const { *ppData = m_pResult; *pDataSize = *m_pResultSize; } //---------------------------------------------------------------------------------------- CMd5Job::CMd5Job( const void *pSrcData, int nSrcSize, bool *pOutHashed, uint8 *pOutHash, unsigned int pSeed[4]/*=NULL*/ ) : m_pSrcData( pSrcData ), m_nSrcSize( nSrcSize ), m_pHashed( pOutHashed ), m_pHash( pOutHash ), m_pSeed( pSeed ) { *m_pHashed = false; V_memset( pOutHash, 0, 16 ); } JobStatus_t CMd5Job::DoExecute() { IF_REPLAY_DBG2( Warning( "Attempting to hash...\n" ) ); PrintEventStartMsg( "Running" ); bool bResult = g_pEngine->MD5_HashBuffer( m_pHash, (const uint8 *)m_pSrcData, m_nSrcSize, m_pSeed ); PrintEventResult( bResult ); *m_pHashed = bResult; if ( !bResult ) return JOB_FAILED; IF_REPLAY_DBG2( Warning( "Hash succeeded\n" ) ); return JOB_OK; } //---------------------------------------------------------------------------------------- CDeleteLocalFileJob::CDeleteLocalFileJob( const char *pFilename ) { V_strncpy( m_szFilename, pFilename, sizeof( m_szFilename ) - 1 ); } JobStatus_t CDeleteLocalFileJob::DoExecute() { // File exists? if ( !g_pFullFileSystem->FileExists( m_szFilename ) ) { SetError( ERROR_FILE_DOES_NOT_EXISTS ); return JOB_FAILED; } // Attempt to remove the file now g_pFullFileSystem->RemoveFile( m_szFilename ); // Delete succeeded? if ( g_pFullFileSystem->FileExists( m_szFilename ) ) { SetError( ERROR_COULD_NOT_DELETE ); return JOB_FAILED; } return JOB_OK; } //---------------------------------------------------------------------------------------- class CBaseFilePublisher : public IFilePublisher { public: enum Phase_t { PHASE_INVALID = -1, PHASE_COMPRESSION, PHASE_HASH, PHASE_ADJUSTHEADER, PHASE_WRITETODISK, PHASE_PUBLISH, PHASE_DELETEFILE, NUM_PHASES }; CBaseFilePublisher() : m_pCallbackHandler( NULL ), m_pUserData( NULL ), m_pCurrentJob( NULL ), m_pInData( NULL ), m_pHeaderData( NULL ), m_nStatus( PUBLISHSTATUS_INVALID ), m_nPhase( PHASE_INVALID ), m_bCompressedOk( false ), m_bHashedOk( false ), m_nHeaderSize( 0 ), m_nCompressedSize( 0 ), m_nInSize( 0 ), m_nInType( IO_INVALID ) { m_szOutFilename[ 0 ] = 0; V_memset( m_aHash, 0, sizeof( m_aHash ) ); } virtual PublishStatus_t GetStatus() const { return m_nStatus; } void SetStatus( PublishStatus_t nStatus ) { m_nStatus = nStatus; } virtual bool IsDone() const { return m_nStatus != PUBLISHSTATUS_INVALID; } virtual bool Compressed() const { return m_bCompressedOk; } virtual bool Hashed() const { return m_bHashedOk; } virtual void GetHash( uint8 *pOut ) const { V_memcpy( pOut, m_aHash, sizeof( m_aHash ) ); } virtual CompressorType_t GetCompressorType() const { return m_bCompressedOk ? m_nCompressorType : COMPRESSORTYPE_INVALID; } virtual int GetCompressedSize() const { return m_nCompressedSize; } virtual void AbortAndCleanup() { if ( m_pCurrentJob ) { m_pCurrentJob->Abort( true ); m_pCurrentJob = NULL; } } virtual void FinishSynchronouslyAndCleanup() { if ( m_pCurrentJob ) { m_pCurrentJob->WaitForFinishAndRelease(); m_pCurrentJob = NULL; } SetStatus( PUBLISHSTATUS_ABORTED ); } virtual void Publish( const PublishFileParams_t ¶ms ) { V_strcpy( m_szOutFilename, params.m_pOutFilename ); m_pInData = params.m_pSrcData; m_nInSize = params.m_nSrcSize; m_pCallbackHandler = params.m_pCallbackHandler; m_pUserData = params.m_pUserData; m_bFreeSrcData = params.m_bFreeSrcData; m_pSrcData = params.m_pSrcData; // Cache src data so we can determine whether free'ing is OK m_pHeaderData = params.m_pHeaderData; m_nHeaderSize = params.m_nHeaderSize; m_flStartTime = g_pEngine->GetHostTime(); if ( params.m_nCompressorType != COMPRESSORTYPE_INVALID ) { m_PhaseQueue.Insert( PHASE_COMPRESSION ); m_nCompressorType = params.m_nCompressorType; // Cache compressor type } if ( params.m_bHash ) { m_PhaseQueue.Insert( PHASE_HASH ); } if ( params.m_pHeaderData ) { Assert( params.m_nHeaderSize ); m_PhaseQueue.Insert( PHASE_ADJUSTHEADER ); } m_PhaseQueue.Insert( PHASE_WRITETODISK ); m_PhaseQueue.Insert( PHASE_PUBLISH ); if ( params.m_bDeleteFile ) { m_PhaseQueue.Insert( PHASE_DELETEFILE ); } // Start off first job SetupNextJob( true ); } void PrintErrors() { // If we don't print out any error now, it'll be lost once the job is released. Kind of a hack. if ( m_pCurrentJob->GetStatus() == JOB_FAILED && !IsFailureOkForPhase() ) { CBasePublishJob *pCurrentJob = dynamic_cast< CBasePublishJob * >( m_pCurrentJob ); if ( pCurrentJob ) { g_pBlockSpewer->PrintBlockStart(); g_pBlockSpewer->PrintEventError( pCurrentJob->GetErrorStr() ); g_pBlockSpewer->PrintBlockEnd(); } } } void Abort() { // Abort the job if ( m_pCurrentJob ) { m_pCurrentJob->Abort( true ); m_pCurrentJob = NULL; } // Update status SetStatus( PUBLISHSTATUS_ABORTED ); // Let owner know we've aborted if ( m_pCallbackHandler ) { m_pCallbackHandler->OnPublishAborted( this ); } } virtual void Think() { const float flCurTime = g_pEngine->GetHostTime(); extern ConVar replay_fileserver_offload_aborttime; if ( flCurTime > m_flStartTime + replay_fileserver_offload_aborttime.GetFloat() ) { g_pBlockSpewer->PrintMsg( Replay_va( "ERROR: Publish timed out after %i seconds.", replay_fileserver_offload_aborttime.GetInt() ) ); Abort(); return; } if ( !m_pCurrentJob ) return; const int nJobStatus = m_pCurrentJob->GetStatus(); if ( nJobStatus <= JOB_OK ) { PrintErrors(); // What it says CacheOutputsOfCurrentJobForInputsOfNextJob(); // Job's done - clean up m_pCurrentJob->Release(); m_pCurrentJob = NULL; // Did the current job fail? bool bPublishDone = false; if ( nJobStatus < JOB_OK && !IsFailureOkForPhase() ) { // Don't process the next job SetStatus( PUBLISHSTATUS_FAILED ); bPublishDone = true; } else if ( IsLastPhase() ) { // nJobStatus is JOB_OK and we are in publish phase. SetStatus( PUBLISHSTATUS_OK ); bPublishDone = true; } if ( bPublishDone ) { InvokeCallback(); return; } // Otherwise, publish isn't complete yet - go to next phase and spawn job thread SetupNextJob( false ); } } protected: virtual CBasePublishJob *GetPublishJob() const = 0; char m_szOutFilename[MAX_OSPATH]; // Filename only IPublishCallbackHandler *m_pCallbackHandler; void *m_pUserData; private: enum IO_t { IO_INVALID = -1, IO_BUFFER, IO_FILE, IO_DONTCARE, // As an input, this means the job doesn't care about the main pipeline stream // (e.g. adjust header gets its inputs elsewhere) phase. As an output, this // should only be used for the final phase (publish). }; void CacheOutputsOfCurrentJobForInputsOfNextJob() { bool bFreeOldInData = false; uint8 *pOldInData = m_pInData; IO_t nOutputType = GetCurrentPhaseOutputType(); // Write phase is a special case if ( m_nPhase == PHASE_WRITETODISK ) { // Clear the in buffer m_pInData = NULL; m_nInSize = 0; bFreeOldInData = true; } else if ( nOutputType == IO_BUFFER ) { // This should always be a CBasePublishJob CBasePublishJob *pCurrentJob = dynamic_cast< CBasePublishJob * >( m_pCurrentJob ); Assert( pCurrentJob ); // Get job output buffer uint8 *pJobOutData; uint32 nJobOutDataSize; pCurrentJob->GetOutputData( &pJobOutData, &nJobOutDataSize ); // Compare output data against input data - if different, free input and replace // with output. In the case of hashing, for example, the input buffer is used // to do some computation, but the buffer itself goes untouched. if ( pJobOutData && ( m_pInData != pJobOutData || m_nInSize != nJobOutDataSize ) ) { m_pInData = pJobOutData; m_nInSize = nJobOutDataSize; bFreeOldInData = true; } } else if ( nOutputType == IO_DONTCARE ) { // This should have been cleaned up in write-to-disk phase if we're in publish phase Assert( m_nPhase != PHASE_PUBLISH || m_pInData == NULL ); } #ifdef _DEBUG else { AssertMsg( 0, "Shouldn't reach here" ); } #endif // Free old input data? if ( bFreeOldInData && ( m_bFreeSrcData || pOldInData != m_pSrcData ) ) { delete [] pOldInData; } // Cache output of current job for input of next job if ( m_nPhase != PHASE_PUBLISH ) { m_nInType = nOutputType; } } // NOTE: This needs to return a CJob ptr (i.e. and not a CBaseJob) since the job may be an AsyncWrite CJob *GetJobForPhase( Phase_t nPhase ) { CJob *pResult = NULL; switch ( nPhase ) { case PHASE_COMPRESSION: pResult = new CCompressionJob( m_pInData, m_nInSize, m_nCompressorType, &m_bCompressedOk, &m_nCompressedSize ); break; case PHASE_HASH: pResult = new CMd5Job( m_pInData, m_nInSize, &m_bHashedOk, m_aHash ); break; case PHASE_ADJUSTHEADER: { // Let the callback handler make any adjustments to the header (add md5 digest, etc.) m_pCallbackHandler->AdjustHeader( this, m_pHeaderData ); if ( m_pHeaderData && m_nHeaderSize ) { // Write the header to the target file FSAsyncControl_t hFileJob; const bool bFreeMemory = false; g_pFullFileSystem->AsyncWrite( m_szOutFilename, m_pHeaderData, m_nHeaderSize, bFreeMemory, false, &hFileJob ); pResult = (CJob *)hFileJob; } } break; case PHASE_WRITETODISK: if ( m_pInData && m_nInSize ) { // Create an asynchronous write job - if a header already exists in the file, append. FSAsyncControl_t hFileJob; const bool bAppend = m_pHeaderData != NULL; g_pFullFileSystem->AsyncWrite( m_szOutFilename, m_pInData, m_nInSize, false, bAppend, &hFileJob ); pResult = (CJob *)hFileJob; } break; case PHASE_PUBLISH: pResult = GetPublishJob(); break; case PHASE_DELETEFILE: pResult = new CDeleteLocalFileJob( m_szOutFilename ); break; default: AssertMsg( 0, "File publish phase is bad." ); } // Sanity check input type with output type of previous job Assert( GetCurrentPhaseInputType() == IO_DONTCARE || m_nInType == IO_DONTCARE || GetCurrentPhaseInputType() == m_nInType ); return pResult; } bool IsFailureOkForPhase() const { // Compression will fail (e.g. due to small buffer size), which shouldn't bring down the house. return m_nPhase == PHASE_COMPRESSION || m_nPhase == PHASE_DELETEFILE; } bool IsLastPhase() const { return m_PhaseQueue.Count() == 0; } IO_t GetCurrentPhaseInputType() const { return sm_aPhaseIOTypes[ m_nPhase ].m_nInputType; } IO_t GetCurrentPhaseOutputType() const { return sm_aPhaseIOTypes[ m_nPhase ].m_nOutputType; } void SetupNextJob( bool bFirstJob ) { // Get next phase from queue Assert( m_PhaseQueue.Count() > 0 ); m_nPhase = ( Phase_t )m_PhaseQueue.RemoveAtHead(); // Set the input type if this is the first job if ( bFirstJob ) { m_nInType = GetCurrentPhaseInputType(); } // Create the job m_pCurrentJob = GetJobForPhase( m_nPhase ); // Kick off the job now SV_GetThreadPool()->AddJob( m_pCurrentJob ); } void InvokeCallback() { if ( m_pCallbackHandler ) { m_pCallbackHandler->OnPublishComplete( this, m_pUserData ); } } CUtlQueue< uint8 > m_PhaseQueue; bool m_bCompressedOk; bool m_bHashedOk; CompressorType_t m_nCompressorType; uint8 m_aHash[16]; Phase_t m_nPhase; PublishStatus_t m_nStatus; CJob *m_pCurrentJob; uint32 m_nCompressedSize; IO_t m_nInType; uint8 *m_pInData; uint32 m_nInSize; bool m_bFreeSrcData; void *m_pSrcData; void *m_pHeaderData; int m_nHeaderSize; float m_flStartTime; struct IoInfo_t { IO_t m_nInputType; IO_t m_nOutputType; }; static IoInfo_t sm_aPhaseIOTypes[ NUM_PHASES ]; }; CBaseFilePublisher::IoInfo_t CBaseFilePublisher::sm_aPhaseIOTypes[ NUM_PHASES ] = { // Input Output { IO_BUFFER, IO_BUFFER }, // PHASE_COMPRESSION { IO_BUFFER, IO_BUFFER }, // PHASE_HASH { IO_DONTCARE, IO_DONTCARE }, // PHASE_ADJUSTHEADER - this phase can operate independent of the pipeline, so // long as any compression/hashing is taken care of. { IO_BUFFER, IO_FILE }, // PHASE_WRITETODISK { IO_FILE, IO_DONTCARE }, // PHASE_PUBLISH { IO_DONTCARE, IO_DONTCARE } // PHASE_DELETEFILE }; //---------------------------------------------------------------------------------------- class CLocalFileserverPublisher : public CBaseFilePublisher { typedef CBaseFilePublisher BaseClass; public: virtual CBasePublishJob *GetPublishJob() const { DBG( "Attempting to publish a file locally...\n" ); // Destination filename is implied return new CLocalPublishJob( m_szOutFilename ); } }; //---------------------------------------------------------------------------------------- IFilePublisher *SV_PublishFile( const PublishFileParams_t ¶ms ) { Assert( !params.m_pHeaderData || ( params.m_pHeaderData && params.m_pCallbackHandler ) ); IFilePublisher *pResult; pResult = new CLocalFileserverPublisher(); pResult->Publish( params ); return pResult; } //----------------------------------------------------------------------------------------