SFrame 3.6
|
00001 // $Id: SCycleBaseExec.cxx 335 2012-11-21 14:11:47Z krasznaa $ 00002 /*************************************************************************** 00003 * @Project: SFrame - ROOT-based analysis framework for ATLAS 00004 * @Package: Core 00005 * 00006 * @author Stefan Ask <Stefan.Ask@cern.ch> - Manchester 00007 * @author David Berge <David.Berge@cern.ch> - CERN 00008 * @author Johannes Haller <Johannes.Haller@cern.ch> - Hamburg 00009 * @author A. Krasznahorkay <Attila.Krasznahorkay@cern.ch> - CERN/Debrecen 00010 * 00011 ***************************************************************************/ 00012 00013 // System include(s): 00014 #include <cstdlib> 00015 00016 // ROOT include(s): 00017 #include <TTree.h> 00018 #include <TFile.h> 00019 #include <TProofOutputFile.h> 00020 #include <TSystem.h> 00021 #include <TTreeCache.h> 00022 00023 // Local include(s): 00024 #include "../include/SCycleBaseExec.h" 00025 #include "../include/SInputData.h" 00026 #include "../include/SCycleConfig.h" 00027 #include "../include/SConstants.h" 00028 #include "../include/SCycleStatistics.h" 00029 #include "../include/SOutputFile.h" 00030 #include "../include/SLogWriter.h" 00031 #include "../include/STreeType.h" 00032 00033 #ifndef DOXYGEN_IGNORE 00034 ClassImp( SCycleBaseExec ) 00035 #endif // DOXYGEN_IGNORE 00036 00040 SCycleBaseExec::SCycleBaseExec() 00041 : m_nProcessedEvents( 0 ), m_nSkippedEvents( 0 ) { 00042 00043 SetLogName( this->GetName() ); 00044 REPORT_VERBOSE( "SCycleBaseExec constructed" ); 00045 } 00046 00047 void SCycleBaseExec::Begin( TTree* ) { 00048 00049 REPORT_VERBOSE( "Running initialization on master" ); 00050 00051 try { 00052 00053 // 00054 // Configure the base classes to write to the TSelector output object: 00055 // 00056 this->SetHistOutput( fOutput ); 00057 this->SetNTupleOutput( fOutput ); 00058 this->SetConfInput( fInput ); 00059 00060 this->ReadConfig(); 00061 this->BeginMasterInputData( *m_inputData ); 00062 00063 } catch( const SError& error ) { 00064 REPORT_FATAL( "Exception caught with message: " << error.what() ); 00065 throw; 00066 } 00067 00068 return; 00069 } 00070 00071 void SCycleBaseExec::SlaveBegin( TTree* ) { 00072 00073 REPORT_VERBOSE( "Running initialization on slave" ); 00074 00075 try { 00076 00077 this->ReadConfig(); 00078 00079 // 00080 // Configure the base classes to write to the TSelector output object: 00081 // 00082 this->SetHistOutput( fOutput ); 00083 this->SetNTupleOutput( fOutput ); 00084 this->SetConfInput( fInput ); 00085 00086 m_outputTrees.clear(); 00087 00088 // 00089 // Open a PROOF output file for the ntuple(s): 00090 // 00091 if( m_inputData->GetTrees( STreeType::OutputSimpleTree ) || 00092 m_inputData->GetTrees( STreeType::OutputMetaTree ) ) { 00093 00094 TProofOutputFile* proofFile = 0; 00095 char* tempDirName = 0; 00096 00097 TNamed* out = 00098 dynamic_cast< TNamed* >( fInput->FindObject( SFrame::ProofOutputName ) ); 00099 if( out ) { 00100 proofFile = 00101 new TProofOutputFile( gSystem->BaseName( TUrl( out->GetTitle() ).GetFile() ) ); 00102 proofFile->SetOutputFileName( out->GetTitle() ); 00103 tempDirName = 0; 00104 fOutput->Add( proofFile ); 00105 } else { 00106 m_logger << DEBUG << "No PROOF output file specified in configuration -> " 00107 << "Running in LOCAL mode" << SLogger::endmsg; 00108 proofFile = 0; 00109 // Use a more or less POSIX method for creating a unique file name: 00110 tempDirName = new char[ 300 ]; 00111 if( gSystem->Getenv( "SFRAME_TEMP_DIR" ) ) { 00112 // Honor the user's preference for the temporary directory 00113 // location: 00114 sprintf( tempDirName, "%s/%s", 00115 gSystem->Getenv( "SFRAME_TEMP_DIR" ), 00116 SFrame::ProofOutputDirName ); 00117 } else { 00118 sprintf( tempDirName, "%s", SFrame::ProofOutputDirName ); 00119 } 00120 if( ! mkdtemp( tempDirName ) ) { 00121 REPORT_FATAL( "Couldn't create temporary directory name from template: " 00122 << SFrame::ProofOutputDirName ); 00123 return; 00124 } 00125 fOutput->Add( new SOutputFile( "SFrameOutputFile", TString( tempDirName ) + 00126 "/" + SFrame::ProofOutputFileName ) ); 00127 } 00128 00129 if( proofFile ) { 00130 if( ! ( m_outputFile = proofFile->OpenFile( "RECREATE" ) ) ) { 00131 m_logger << WARNING << "Couldn't open output file: " 00132 << proofFile->GetDir() << "/" << proofFile->GetFileName() 00133 << SLogger::endmsg; 00134 m_logger << WARNING << "Saving the ntuples to memory" << SLogger::endmsg; 00135 } else { 00136 m_logger << DEBUG << "PROOF temp file opened with name: " 00137 << m_outputFile->GetName() << SLogger::endmsg; 00138 } 00139 } else { 00140 if( ! tempDirName ) { 00141 REPORT_FATAL( "No temporary directory name? There's some serious error " 00142 "in the code!" ); 00143 return; 00144 } 00145 00146 // Open an intermediate file in this temporary directory: 00147 if( ! ( m_outputFile = TFile::Open( TString( tempDirName ) + "/" + 00148 SFrame::ProofOutputFileName , "RECREATE" ) ) ) { 00149 m_logger << WARNING << "Couldn't open output file: " 00150 << tempDirName << "/" << SFrame::ProofOutputFileName << SLogger::endmsg; 00151 m_logger << WARNING << "Saving the ntuples to memory" << SLogger::endmsg; 00152 } else { 00153 m_logger << DEBUG << "LOCAL temp file opened with name: " 00154 << tempDirName << "/" << SFrame::ProofOutputFileName << SLogger::endmsg; 00155 } 00156 } 00157 00158 this->CreateOutputTrees( *m_inputData, m_outputTrees, m_outputFile ); 00159 00160 if( tempDirName ) delete[] tempDirName; 00161 00162 } else { 00163 m_outputFile = 0; 00164 } 00165 00166 this->BeginInputData( *m_inputData ); 00167 00168 } catch( const SError& error ) { 00169 REPORT_FATAL( "Exception caught with message: " << error.what() ); 00170 throw; 00171 } 00172 00173 m_nProcessedEvents = 0; 00174 m_nSkippedEvents = 0; 00175 m_firstInit = kTRUE; 00176 00177 m_logger << INFO << "Initialised InputData \"" << m_inputData->GetType() 00178 << "\" (Version:" << m_inputData->GetVersion() 00179 << ") on worker node" << SLogger::endmsg; 00180 00181 return; 00182 } 00183 00184 void SCycleBaseExec::Init( TTree* main_tree ) { 00185 00186 REPORT_VERBOSE( "Caching the pointer to the main input tree" ); 00187 m_inputTree = main_tree; 00188 00189 return; 00190 } 00191 00192 Bool_t SCycleBaseExec::Notify() { 00193 00194 REPORT_VERBOSE( "Accessing a new input file" ); 00195 00196 // Should not run the initialization when it's first called in LOCAL mode. 00197 // ROOT always calls Notify() twice in this mode. Note that this behavior 00198 // might change in future ROOT versions... 00199 if( ( GetConfig().GetRunMode() == SCycleConfig::LOCAL ) && m_firstInit ) { 00200 m_firstInit = kFALSE; 00201 return kTRUE; 00202 } 00203 00204 TFile* inputFile = 0; 00205 try { 00206 00207 this->LoadInputTrees( *m_inputData, m_inputTree, inputFile ); 00208 this->SetHistInputFile( inputFile ); 00209 this->BeginInputFile( *m_inputData ); 00210 00211 } catch( const SError& error ) { 00212 REPORT_FATAL( "Exception caught with message: " << error.what() ); 00213 throw; 00214 } 00215 00216 #if ROOT_VERSION_CODE >= ROOT_VERSION( 5, 26, 0 ) 00217 // Tell the cache to learn the access pattern for the configured number 00218 // of entries: 00219 if( GetConfig().GetCacheLearnEntries() > 0 ) { 00220 m_inputTree->SetCacheLearnEntries( GetConfig().GetCacheLearnEntries() ); 00221 } else { 00222 // If it's set to a negative number, add all the branches to the cache. 00223 // Otherwise (it's 0) trust that the user already added all the necessary branches 00224 // inside BeginInputFile(...). 00225 if( GetConfig().GetCacheLearnEntries() < 0 ) { 00226 m_inputTree->AddBranchToCache( "*", kTRUE ); 00227 } 00228 m_inputTree->StopCacheLearningPhase(); 00229 } 00230 00231 // According to user reports, trying to turn on TTreeCache in LOCAL mode 00232 // leads to hard-to-detect, but serious problems. (The results don't match 00233 // up with the ones acquired without using a cache.) So, for now the code 00234 // doesn't try to use a cache in this case. 00235 if( GetConfig().GetUseTreeCache() && 00236 ( GetConfig().GetRunMode() == SCycleConfig::LOCAL ) ) { 00237 m_logger << WARNING << "Can't use a TTreeCache in LOCAL mode, sorry..." 00238 << SLogger::endmsg; 00239 } 00240 #endif // ROOT_VERSION... 00241 00242 return kTRUE; 00243 } 00244 00245 Bool_t SCycleBaseExec::Process( Long64_t entry ) { 00246 00247 Bool_t skipEvent = kFALSE; 00248 try { 00249 00250 this->GetEvent( entry ); 00251 m_inputData->SetEventTreeEntry( entry ); 00252 this->ExecuteEvent( *m_inputData, this->CalculateWeight( *m_inputData, 00253 entry ) ); 00254 00255 } catch( const SError& error ) { 00256 if( error.request() <= SError::SkipEvent ) { 00257 REPORT_VERBOSE( "Exeption caught while processing event" ); 00258 REPORT_VERBOSE( " Message: " << error.what() ); 00259 REPORT_VERBOSE( " --> Skipping event!" ); 00260 skipEvent = kTRUE; 00261 } else { 00262 REPORT_FATAL( "Exception caught while processing event" ); 00263 REPORT_FATAL( "Message: " << error.what() ); 00264 throw; 00265 } 00266 } 00267 00268 if( ! skipEvent ) { 00269 int nbytes = 0; 00270 for( std::vector< TTree* >::iterator tree = m_outputTrees.begin(); 00271 tree != m_outputTrees.end(); ++tree ) { 00272 nbytes = ( *tree )->Fill(); 00273 if( nbytes < 0 ) { 00274 REPORT_ERROR( "Write error occured in tree \"" 00275 << ( *tree )->GetName() << "\"" ); 00276 } else if( nbytes == 0 ) { 00277 m_logger << WARNING << "No data written to tree \"" 00278 << ( *tree )->GetName() << "\"" << SLogger::endmsg; 00279 } 00280 } 00281 } else { 00282 ++m_nSkippedEvents; 00283 } 00284 00285 ++m_nProcessedEvents; 00286 if( ! ( m_nProcessedEvents % 1000 ) ) { 00287 // Only print these messages in local mode in INFO level. In PROOF mode they're 00288 // only needed for debugging. 00289 m_logger << ( GetConfig().GetRunMode() == SCycleConfig::LOCAL ? INFO : DEBUG ) 00290 << "Processing entry: " << entry << " (" 00291 << ( m_nProcessedEvents - 1 ) << " / " 00292 << ( m_inputData->GetNEventsMax() < 0 ? m_inputData->GetEventsTotal() : 00293 m_inputData->GetNEventsMax() ) 00294 << " events processed so far)" << SLogger::endmsg; 00295 } 00296 00297 return kTRUE; 00298 } 00299 00300 void SCycleBaseExec::SlaveTerminate() { 00301 00302 REPORT_VERBOSE( "Running finalization on slave" ); 00303 00304 // 00305 // Tell the user cycle that the InputData has ended: 00306 // 00307 try { 00308 this->EndInputData( *m_inputData ); 00309 } catch( const SError& error ) { 00310 REPORT_FATAL( "Exception caught with message: " << error.what() ); 00311 throw; 00312 } 00313 00314 // 00315 // Write the objects that are meant to be merged in-file, into 00316 // the output file: 00317 // 00318 this->WriteHistObjects( m_outputFile ); 00319 00320 // 00321 // Write the node statistics to the output: 00322 // 00323 SCycleStatistics* stat = new SCycleStatistics( SFrame::RunStatisticsName, 00324 m_nProcessedEvents, m_nSkippedEvents ); 00325 fOutput->Add( stat ); 00326 00327 // 00328 // Close the output file: 00329 // 00330 if( m_outputFile ) { 00331 00332 m_logger << DEBUG << "Closing output file: " << m_outputFile->GetName() 00333 << SLogger::endmsg; 00334 00335 // Save all the output trees into the output file: 00336 this->SaveOutputTrees( m_outputFile ); 00337 00338 // Close the output file and reset the variables: 00339 m_outputFile->SaveSelf( kTRUE ); 00340 m_outputFile->Close(); 00341 delete m_outputFile; 00342 m_outputFile = 0; 00343 m_outputTrees.clear(); 00344 00345 } 00346 00347 // Reset the ntuple handling component: 00348 this->ClearCachedTrees(); 00349 00350 m_logger << INFO << "Terminated InputData \"" << m_inputData->GetType() 00351 << "\" (Version:" << m_inputData->GetVersion() 00352 << ") on worker node" << SLogger::endmsg; 00353 00354 return; 00355 } 00356 00357 void SCycleBaseExec::Terminate() { 00358 00359 REPORT_VERBOSE( "Running finalization on the master" ); 00360 00361 try { 00362 this->EndMasterInputData( *m_inputData ); 00363 } catch( const SError& error ) { 00364 REPORT_FATAL( "Exception caught with message: " << error.what() ); 00365 throw; 00366 } 00367 00368 return; 00369 } 00370 00375 void SCycleBaseExec::ReadConfig() throw( SError ) { 00376 00377 // 00378 // Read the overall cycle configuration: 00379 // 00380 SCycleConfig* config = 00381 dynamic_cast< SCycleConfig* >( fInput->FindObject( SFrame::CycleConfigName ) ); 00382 if( ! config ) { 00383 REPORT_FATAL( "Couldn't retrieve the cycle configuration" ); 00384 throw SError( "Couldn't find cycle configuration object", SError::SkipCycle ); 00385 return; 00386 } 00387 this->SetConfig( *config ); 00388 SLogWriter::Instance()->SetMinType( config->GetMsgLevel() ); 00389 00390 // 00391 // Read which InputData we're processing at the moment: 00392 // 00393 m_inputData = 00394 dynamic_cast< SInputData* >( fInput->FindObject( SFrame::CurrentInputDataName ) ); 00395 if( ! m_inputData ) { 00396 REPORT_FATAL( "Couldn't retrieve the input data definition currently " 00397 << "being processed" ); 00398 throw SError( "Couldn't find current input data configuration object", 00399 SError::SkipCycle ); 00400 return; 00401 } 00402 00403 return; 00404 } 00405 00406 void SCycleBaseExec::ExecuteEvent( Int_t /*event*/, Int_t /*px*/, 00407 Int_t /*py*/ ) { 00408 00409 REPORT_ERROR( "This function should never get called!" ); 00410 00411 return; 00412 }