SFrame 3.6
core/src/SCycleBaseExec.cxx
Go to the documentation of this file.
00001 // $Id: SCycleBaseExec.cxx 335 2012-11-21 14:11:47Z krasznaa $
00002 /***************************************************************************
00003  * @Project: SFrame - ROOT-based analysis framework for ATLAS
00004  * @Package: Core
00005  *
00006  * @author Stefan Ask       <Stefan.Ask@cern.ch>           - Manchester
00007  * @author David Berge      <David.Berge@cern.ch>          - CERN
00008  * @author Johannes Haller  <Johannes.Haller@cern.ch>      - Hamburg
00009  * @author A. Krasznahorkay <Attila.Krasznahorkay@cern.ch> - CERN/Debrecen
00010  *
00011  ***************************************************************************/
00012 
00013 // System include(s):
00014 #include <cstdlib>
00015 
00016 // ROOT include(s):
00017 #include <TTree.h>
00018 #include <TFile.h>
00019 #include <TProofOutputFile.h>
00020 #include <TSystem.h>
00021 #include <TTreeCache.h>
00022 
00023 // Local include(s):
00024 #include "../include/SCycleBaseExec.h"
00025 #include "../include/SInputData.h"
00026 #include "../include/SCycleConfig.h"
00027 #include "../include/SConstants.h"
00028 #include "../include/SCycleStatistics.h"
00029 #include "../include/SOutputFile.h"
00030 #include "../include/SLogWriter.h"
00031 #include "../include/STreeType.h"
00032 
00033 #ifndef DOXYGEN_IGNORE
00034 ClassImp( SCycleBaseExec )
00035 #endif // DOXYGEN_IGNORE
00036 
00040 SCycleBaseExec::SCycleBaseExec()
00041    : m_nProcessedEvents( 0 ), m_nSkippedEvents( 0 ) {
00042 
00043    SetLogName( this->GetName() );
00044    REPORT_VERBOSE( "SCycleBaseExec constructed" );
00045 }
00046 
00047 void SCycleBaseExec::Begin( TTree* ) {
00048 
00049    REPORT_VERBOSE( "Running initialization on master" );
00050 
00051    try {
00052 
00053       //
00054       // Configure the base classes to write to the TSelector output object:
00055       //
00056       this->SetHistOutput( fOutput );
00057       this->SetNTupleOutput( fOutput );
00058       this->SetConfInput( fInput );
00059 
00060       this->ReadConfig();
00061       this->BeginMasterInputData( *m_inputData );
00062 
00063    } catch( const SError& error ) {
00064       REPORT_FATAL( "Exception caught with message: " << error.what() );
00065       throw;
00066    }
00067 
00068    return;
00069 }
00070 
00071 void SCycleBaseExec::SlaveBegin( TTree* ) {
00072 
00073    REPORT_VERBOSE( "Running initialization on slave" );
00074 
00075    try {
00076 
00077       this->ReadConfig();
00078 
00079       //
00080       // Configure the base classes to write to the TSelector output object:
00081       //
00082       this->SetHistOutput( fOutput );
00083       this->SetNTupleOutput( fOutput );
00084       this->SetConfInput( fInput );
00085 
00086       m_outputTrees.clear();
00087 
00088       //
00089       // Open a PROOF output file for the ntuple(s):
00090       //
00091       if( m_inputData->GetTrees( STreeType::OutputSimpleTree ) ||
00092           m_inputData->GetTrees( STreeType::OutputMetaTree ) ) {
00093 
00094          TProofOutputFile* proofFile = 0;
00095          char* tempDirName = 0;
00096 
00097          TNamed* out =
00098             dynamic_cast< TNamed* >( fInput->FindObject( SFrame::ProofOutputName ) );
00099          if( out ) {
00100             proofFile =
00101                new TProofOutputFile( gSystem->BaseName( TUrl( out->GetTitle() ).GetFile() ) );
00102             proofFile->SetOutputFileName( out->GetTitle() );
00103             tempDirName = 0;
00104             fOutput->Add( proofFile );
00105          } else {
00106             m_logger << DEBUG << "No PROOF output file specified in configuration -> "
00107                      << "Running in LOCAL mode" << SLogger::endmsg;
00108             proofFile = 0;
00109             // Use a more or less POSIX method for creating a unique file name:
00110             tempDirName = new char[ 300 ];
00111             if( gSystem->Getenv( "SFRAME_TEMP_DIR" ) ) {
00112                // Honor the user's preference for the temporary directory
00113                // location:
00114                sprintf( tempDirName, "%s/%s",
00115                         gSystem->Getenv( "SFRAME_TEMP_DIR" ),
00116                         SFrame::ProofOutputDirName );
00117             } else {
00118                sprintf( tempDirName, "%s", SFrame::ProofOutputDirName );
00119             }
00120             if( ! mkdtemp( tempDirName ) ) {
00121                REPORT_FATAL( "Couldn't create temporary directory name from template: "
00122                              << SFrame::ProofOutputDirName );
00123                return;
00124             }
00125             fOutput->Add( new SOutputFile( "SFrameOutputFile", TString( tempDirName ) +
00126                                            "/" +  SFrame::ProofOutputFileName ) );
00127          }
00128 
00129          if( proofFile ) {
00130             if( ! ( m_outputFile = proofFile->OpenFile( "RECREATE" ) ) ) {
00131                m_logger << WARNING << "Couldn't open output file: "
00132                         << proofFile->GetDir() << "/" << proofFile->GetFileName()
00133                         << SLogger::endmsg;
00134                m_logger << WARNING << "Saving the ntuples to memory" << SLogger::endmsg;
00135             } else {
00136                m_logger << DEBUG << "PROOF temp file opened with name: "
00137                         << m_outputFile->GetName() << SLogger::endmsg;
00138             }
00139          } else {
00140             if( ! tempDirName ) {
00141                REPORT_FATAL( "No temporary directory name? There's some serious error "
00142                              "in the code!" );
00143                return;
00144             }
00145 
00146             // Open an intermediate file in this temporary directory:
00147             if( ! ( m_outputFile = TFile::Open( TString( tempDirName ) + "/" +
00148                                                 SFrame::ProofOutputFileName , "RECREATE" ) ) ) {
00149                m_logger << WARNING << "Couldn't open output file: "
00150                         << tempDirName << "/" << SFrame::ProofOutputFileName << SLogger::endmsg;
00151                m_logger << WARNING << "Saving the ntuples to memory" << SLogger::endmsg;
00152             } else {
00153                m_logger << DEBUG << "LOCAL temp file opened with name: "
00154                         << tempDirName << "/" << SFrame::ProofOutputFileName << SLogger::endmsg;
00155             }
00156          }
00157 
00158          this->CreateOutputTrees( *m_inputData, m_outputTrees, m_outputFile );
00159 
00160          if( tempDirName ) delete[] tempDirName;
00161 
00162       } else {
00163          m_outputFile = 0;
00164       }
00165 
00166       this->BeginInputData( *m_inputData );
00167 
00168    } catch( const SError& error ) {
00169       REPORT_FATAL( "Exception caught with message: " << error.what() );
00170       throw;
00171    }
00172 
00173    m_nProcessedEvents = 0;
00174    m_nSkippedEvents = 0;
00175    m_firstInit = kTRUE;
00176 
00177    m_logger << INFO << "Initialised InputData \"" << m_inputData->GetType()
00178             << "\" (Version:" << m_inputData->GetVersion()
00179             << ") on worker node" << SLogger::endmsg;
00180 
00181    return;
00182 }
00183 
00184 void SCycleBaseExec::Init( TTree* main_tree ) {
00185 
00186    REPORT_VERBOSE( "Caching the pointer to the main input tree" );
00187    m_inputTree = main_tree;
00188 
00189    return;
00190 }
00191 
00192 Bool_t SCycleBaseExec::Notify() {
00193 
00194    REPORT_VERBOSE( "Accessing a new input file" );
00195 
00196    // Should not run the initialization when it's first called in LOCAL mode.
00197    // ROOT always calls Notify() twice in this mode. Note that this behavior
00198    // might change in future ROOT versions...
00199    if( ( GetConfig().GetRunMode() == SCycleConfig::LOCAL ) && m_firstInit ) {
00200       m_firstInit = kFALSE;
00201       return kTRUE;
00202    }
00203 
00204    TFile* inputFile = 0;
00205    try {
00206 
00207       this->LoadInputTrees( *m_inputData, m_inputTree, inputFile );
00208       this->SetHistInputFile( inputFile );
00209       this->BeginInputFile( *m_inputData );
00210 
00211    } catch( const SError& error ) {
00212       REPORT_FATAL( "Exception caught with message: " << error.what() );
00213       throw;
00214    }
00215 
00216 #if ROOT_VERSION_CODE >= ROOT_VERSION( 5, 26, 0 )
00217    // Tell the cache to learn the access pattern for the configured number
00218    // of entries:
00219    if( GetConfig().GetCacheLearnEntries() > 0 ) {
00220       m_inputTree->SetCacheLearnEntries( GetConfig().GetCacheLearnEntries() );
00221    } else {
00222       // If it's set to a negative number, add all the branches to the cache.
00223       // Otherwise (it's 0) trust that the user already added all the necessary branches
00224       // inside BeginInputFile(...).
00225       if( GetConfig().GetCacheLearnEntries() < 0 ) {
00226          m_inputTree->AddBranchToCache( "*", kTRUE );
00227       }
00228       m_inputTree->StopCacheLearningPhase();
00229    }
00230 
00231    // According to user reports, trying to turn on TTreeCache in LOCAL mode
00232    // leads to hard-to-detect, but serious problems. (The results don't match
00233    // up with the ones acquired without using a cache.) So, for now the code
00234    // doesn't try to use a cache in this case.
00235    if( GetConfig().GetUseTreeCache() &&
00236        ( GetConfig().GetRunMode() == SCycleConfig::LOCAL ) ) {
00237       m_logger << WARNING << "Can't use a TTreeCache in LOCAL mode, sorry..."
00238                << SLogger::endmsg;
00239    }
00240 #endif // ROOT_VERSION...
00241 
00242    return kTRUE;
00243 }
00244 
00245 Bool_t SCycleBaseExec::Process( Long64_t entry ) {
00246 
00247    Bool_t skipEvent = kFALSE;
00248    try {
00249 
00250       this->GetEvent( entry );
00251       m_inputData->SetEventTreeEntry( entry );
00252       this->ExecuteEvent( *m_inputData, this->CalculateWeight( *m_inputData,
00253                                                                entry ) );
00254 
00255    } catch( const SError& error ) {
00256       if( error.request() <= SError::SkipEvent ) {
00257          REPORT_VERBOSE( "Exeption caught while processing event" );
00258          REPORT_VERBOSE( " Message: " << error.what() );
00259          REPORT_VERBOSE( " --> Skipping event!" );
00260          skipEvent = kTRUE;
00261       } else {
00262          REPORT_FATAL( "Exception caught while processing event" );
00263          REPORT_FATAL( "Message: " << error.what() );
00264          throw;
00265       }
00266    }
00267 
00268    if( ! skipEvent ) {
00269       int nbytes = 0;
00270       for( std::vector< TTree* >::iterator tree = m_outputTrees.begin();
00271            tree != m_outputTrees.end(); ++tree ) {
00272          nbytes = ( *tree )->Fill();
00273          if( nbytes < 0 ) {
00274             REPORT_ERROR( "Write error occured in tree \""
00275                           << ( *tree )->GetName() << "\"" );
00276          } else if( nbytes == 0 ) {
00277             m_logger << WARNING << "No data written to tree \""
00278                      << ( *tree )->GetName() << "\"" << SLogger::endmsg;
00279          }
00280       }
00281    } else {
00282       ++m_nSkippedEvents;
00283    }
00284 
00285    ++m_nProcessedEvents;
00286    if( ! ( m_nProcessedEvents % 1000 ) ) {
00287       // Only print these messages in local mode in INFO level. In PROOF mode they're
00288       // only needed for debugging.
00289       m_logger << ( GetConfig().GetRunMode() == SCycleConfig::LOCAL ? INFO : DEBUG )
00290                << "Processing entry: " << entry << " ("
00291                << ( m_nProcessedEvents - 1 ) << " / "
00292                << ( m_inputData->GetNEventsMax() < 0 ? m_inputData->GetEventsTotal() :
00293                     m_inputData->GetNEventsMax() )
00294                << " events processed so far)" << SLogger::endmsg;
00295    }
00296 
00297    return kTRUE;
00298 }
00299 
00300 void SCycleBaseExec::SlaveTerminate() {
00301 
00302    REPORT_VERBOSE( "Running finalization on slave" );
00303 
00304    //
00305    // Tell the user cycle that the InputData has ended:
00306    //
00307    try {
00308       this->EndInputData( *m_inputData );
00309    } catch( const SError& error ) {
00310       REPORT_FATAL( "Exception caught with message: " << error.what() );
00311       throw;
00312    }
00313 
00314    //
00315    // Write the objects that are meant to be merged in-file, into
00316    // the output file:
00317    //
00318    this->WriteHistObjects( m_outputFile );
00319 
00320    //
00321    // Write the node statistics to the output:
00322    //
00323    SCycleStatistics* stat = new SCycleStatistics( SFrame::RunStatisticsName,
00324                                                   m_nProcessedEvents, m_nSkippedEvents );
00325    fOutput->Add( stat );
00326 
00327    //
00328    // Close the output file:
00329    //
00330    if( m_outputFile ) {
00331 
00332       m_logger << DEBUG << "Closing output file: " << m_outputFile->GetName()
00333                << SLogger::endmsg;
00334 
00335       // Save all the output trees into the output file:
00336       this->SaveOutputTrees( m_outputFile );
00337 
00338       // Close the output file and reset the variables:
00339       m_outputFile->SaveSelf( kTRUE );
00340       m_outputFile->Close();
00341       delete m_outputFile;
00342       m_outputFile = 0;
00343       m_outputTrees.clear();
00344 
00345    }
00346 
00347    // Reset the ntuple handling component:
00348    this->ClearCachedTrees();
00349 
00350    m_logger << INFO << "Terminated InputData \"" << m_inputData->GetType()
00351             << "\" (Version:" << m_inputData->GetVersion()
00352             << ") on worker node" << SLogger::endmsg;
00353 
00354    return;
00355 }
00356 
00357 void SCycleBaseExec::Terminate() {
00358 
00359    REPORT_VERBOSE( "Running finalization on the master" );
00360 
00361    try {
00362       this->EndMasterInputData( *m_inputData );
00363    } catch( const SError& error ) {
00364       REPORT_FATAL( "Exception caught with message: " << error.what() );
00365       throw;
00366    }
00367 
00368    return;
00369 }
00370 
00375 void SCycleBaseExec::ReadConfig() throw( SError ) {
00376 
00377    //
00378    // Read the overall cycle configuration:
00379    //
00380    SCycleConfig* config =
00381       dynamic_cast< SCycleConfig* >( fInput->FindObject( SFrame::CycleConfigName ) );
00382    if( ! config ) {
00383       REPORT_FATAL( "Couldn't retrieve the cycle configuration" );
00384       throw SError( "Couldn't find cycle configuration object", SError::SkipCycle );
00385       return;
00386    }
00387    this->SetConfig( *config );
00388    SLogWriter::Instance()->SetMinType( config->GetMsgLevel() );
00389 
00390    //
00391    // Read which InputData we're processing at the moment:
00392    //
00393    m_inputData =
00394       dynamic_cast< SInputData* >( fInput->FindObject( SFrame::CurrentInputDataName ) );
00395    if( ! m_inputData ) {
00396       REPORT_FATAL( "Couldn't retrieve the input data definition currently "
00397                     << "being processed" );
00398       throw SError( "Couldn't find current input data configuration object",
00399                     SError::SkipCycle );
00400       return;
00401    }
00402 
00403    return;
00404 }
00405 
00406 void SCycleBaseExec::ExecuteEvent( Int_t /*event*/, Int_t /*px*/,
00407                                    Int_t /*py*/ ) {
00408 
00409    REPORT_ERROR( "This function should never get called!" );
00410 
00411    return;
00412 }