SFrame 3.6
|
00001 // $Id: SCycleController.cxx 335 2012-11-21 14:11:47Z krasznaa $ 00002 /*************************************************************************** 00003 * @Project: SFrame - ROOT-based analysis framework for ATLAS 00004 * @Package: Core 00005 * 00006 * @author Stefan Ask <Stefan.Ask@cern.ch> - Manchester 00007 * @author David Berge <David.Berge@cern.ch> - CERN 00008 * @author Johannes Haller <Johannes.Haller@cern.ch> - Hamburg 00009 * @author A. Krasznahorkay <Attila.Krasznahorkay@cern.ch> - CERN/Debrecen 00010 * 00011 ***************************************************************************/ 00012 00013 // STL include(s): 00014 #include <iomanip> 00015 #include <iostream> 00016 #include <sstream> 00017 #include <cstdlib> 00018 #include <limits> 00019 00020 // ROOT include(s): 00021 #include <TDOMParser.h> 00022 #include <TXMLNode.h> 00023 #include <TXMLDocument.h> 00024 #include <TXMLAttr.h> 00025 #include <TStopwatch.h> 00026 #include <TSystem.h> 00027 #include <TClass.h> 00028 #include <TROOT.h> 00029 #include <TPython.h> 00030 #include <TChain.h> 00031 #include <TList.h> 00032 #include <TFile.h> 00033 #include <TProof.h> 00034 #include <TProofOutputFile.h> 00035 #include <TDSet.h> 00036 #include <TFileCollection.h> 00037 #include <THashList.h> 00038 #include <TFileInfo.h> 00039 #include <TObjString.h> 00040 00041 // Local include(s): 00042 #include "../include/SCycleController.h" 00043 #include "../include/ISCycleBase.h" 00044 #include "../include/SLogWriter.h" 00045 #include "../include/SConstants.h" 00046 #include "../include/SParLocator.h" 00047 #include "../include/SCycleStatistics.h" 00048 #include "../include/SFileMerger.h" 00049 #include "../include/SOutputFile.h" 00050 #include "../include/SCycleConfig.h" 00051 #include "../include/SCycleOutput.h" 00052 #include "../include/SProofManager.h" 00053 00062 SCycleController::SCycleController( const TString& xmlConfigFile ) 00063 : m_curCycle( 0 ), m_isInitialized( kFALSE ), m_xmlConfigFile( xmlConfigFile ), 00064 m_proof( 0 ), m_logger( "SCycleController" ) { 00065 00066 } 00067 00074 SCycleController::~SCycleController() { 00075 00076 std::vector< ISCycleBase* >::const_iterator it = m_analysisCycles.begin(); 00077 for( ; it != m_analysisCycles.end(); ++it) { 00078 delete ( *it ); 00079 } 00080 00081 ShutDownProof(); 00082 } 00083 00092 void SCycleController::Initialize() throw( SError ) { 00093 00094 m_logger << INFO << "Initializing" << SLogger::endmsg; 00095 00096 // Just for kicks, lets measure the time it needs to initialise the 00097 // analysis: 00098 TStopwatch timer; 00099 timer.Start(); 00100 00101 // first clean up everything in case this is called multiple times 00102 m_curCycle = 0; 00103 this->DeleteAllAnalysisCycles(); 00104 m_parPackages.clear(); 00105 00106 // --------------- xml read 00107 m_logger << INFO << "read xml file: '" << m_xmlConfigFile << "'" << SLogger::endmsg; 00108 00109 TDOMParser xmlparser; 00110 00111 // This is a new feature only available in the newest ROOT 00112 // nightlies. It makes it possible to have the input file 00113 // definitions in external XML files that are imported in 00114 // the main configuration XML file. It's conventient when 00115 // using a lot of the same input files in different cycles. 00116 #if ROOT_VERSION_CODE >= ROOT_VERSION( 5, 17, 4 ) 00117 xmlparser.SetReplaceEntities( kTRUE ); 00118 #endif 00119 00120 Int_t parseError = xmlparser.ParseFile( m_xmlConfigFile ); 00121 if( parseError ) { 00122 SError error( SError::StopExecution ); 00123 error << "Loading of xml document \"" << m_xmlConfigFile 00124 << "\" failed"; 00125 throw error; 00126 } 00127 00128 // --------------- interpret xml file 00129 00130 // read cycles and libraries 00131 00132 TXMLDocument* xmldoc = xmlparser.GetXMLDocument(); 00133 TXMLNode* rootNode = xmldoc->GetRootNode(); 00134 00135 if( rootNode->GetNodeName() == TString( "JobConfiguration" ) ) { 00136 std::string jobName = ""; 00137 std::string outputLevelString = ""; 00138 TListIter attribIt( rootNode->GetAttributes() ); 00139 TXMLAttr* curAttr( 0 ); 00140 while ( (curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) { 00141 if( curAttr->GetName() == TString( "JobName" ) ) 00142 jobName = curAttr->GetValue(); 00143 else if( curAttr->GetName() == TString( "OutputLevel" ) ) 00144 outputLevelString = curAttr->GetValue(); 00145 } 00146 SMsgType type = INFO; 00147 if ( outputLevelString == "VERBOSE" ) type = VERBOSE; 00148 else if( outputLevelString == "DEBUG" ) type = DEBUG; 00149 else if( outputLevelString == "INFO" ) type = INFO; 00150 else if( outputLevelString == "WARNING" ) type = WARNING; 00151 else if( outputLevelString == "ERROR" ) type = ERROR; 00152 else if( outputLevelString == "FATAL" ) type = FATAL; 00153 else if( outputLevelString == "ALWAYS" ) type = ALWAYS; 00154 else { 00155 m_logger << WARNING << "Message output level (" 00156 << outputLevelString << ") not recognized" 00157 << SLogger::endmsg; 00158 } 00159 SLogWriter::Instance()->SetMinType( type ); 00160 00161 TXMLNode* nodes = rootNode->GetChildren(); 00162 00163 // now loop over nodes 00164 while( nodes != 0 ) { 00165 if( ! nodes->HasAttributes() ) { 00166 nodes = nodes->GetNextNode(); 00167 continue; 00168 } 00169 00170 try { // For catching "cycle level" problems... 00171 00172 if( nodes->GetNodeName() == TString( "Cycle" ) ) { 00173 00174 std::string cycleName = ""; 00175 00176 attribIt = nodes->GetAttributes(); 00177 curAttr = 0; 00178 while ( ( curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) { 00179 if( curAttr->GetName() == TString( "Name" ) ) 00180 cycleName = curAttr->GetValue(); 00181 } 00182 00183 TClass* cycleClass = gROOT->GetClass( cycleName.c_str(), true ); 00184 if( ! cycleClass || ! cycleClass->InheritsFrom( "ISCycleBase" ) ) { 00185 SError error( SError::SkipCycle ); 00186 error << "Loading of class \"" << cycleName << "\" failed"; 00187 throw error; 00188 } 00189 00190 ISCycleBase* cycle = reinterpret_cast< ISCycleBase* >( cycleClass->New() ); 00191 00192 m_logger << INFO << "Created cycle '" << cycleName << "'" 00193 << SLogger::endmsg; 00194 00195 cycle->Initialize( nodes ); 00196 this->AddAnalysisCycle( cycle ); 00197 00198 } else if( nodes->GetNodeName() == TString( "Library" ) ) { 00199 00200 std::string libraryName = ""; 00201 attribIt = nodes->GetAttributes(); 00202 curAttr = 0; 00203 while ( ( curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) { 00204 if( curAttr->GetName() == TString( "Name" ) ) 00205 libraryName = curAttr->GetValue(); 00206 } 00207 REPORT_VERBOSE( "Trying to load library \"" << libraryName << "\"" ); 00208 00209 int ret = 0; 00210 if( ( ret = gSystem->Load( libraryName.c_str() ) ) >= 0 ) { 00211 m_logger << DEBUG << "Library loaded: \"" << libraryName << "\"" 00212 << SLogger::endmsg; 00213 } else { 00214 SError error( SError::StopExecution ); 00215 error << "Library failed to load: \"" << libraryName 00216 << "\"\nRet. Val.: " << ret; 00217 throw error; 00218 } 00219 00220 } else if( nodes->GetNodeName() == TString( "PyLibrary" ) ) { 00221 00222 std::string libraryName = ""; 00223 attribIt = nodes->GetAttributes(); 00224 curAttr = 0; 00225 while ( ( curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) { 00226 if( curAttr->GetName() == TString( "Name" ) ) 00227 libraryName = curAttr->GetValue(); 00228 } 00229 REPORT_VERBOSE( "Trying to load python library \"" << libraryName 00230 << "\"" ); 00231 00232 std::ostringstream command; 00233 command << "import " << libraryName; 00234 00235 TPython::Exec( command.str().c_str() ); 00236 00237 } else if( nodes->GetNodeName() == TString( "Package" ) ) { 00238 00239 TString packageName; 00240 attribIt = nodes->GetAttributes(); 00241 curAttr = 0; 00242 while ( ( curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) { 00243 if( curAttr->GetName() == TString( "Name" ) ) 00244 packageName = curAttr->GetValue(); 00245 } 00246 m_logger << DEBUG << "Using PROOF ARchive package: " << packageName 00247 << SLogger::endmsg; 00248 00249 m_parPackages.push_back( packageName ); 00250 00251 } 00252 00253 } catch( const SError& error ) { 00254 // 00255 // This is where I catch "cycle level" problems: 00256 // 00257 if( error.request() <= SError::SkipCycle ) { 00258 // If just this cycle has to be skipped: 00259 REPORT_ERROR( "Exception caught while processing node: " 00260 << nodes->GetNodeName() ); 00261 REPORT_ERROR( "Message: " << error.what() ); 00262 REPORT_ERROR( "--> Skipping cycle!" ); 00263 00264 nodes = nodes->GetNextNode(); 00265 continue; 00266 } else { 00267 // If this is more serious: 00268 throw; 00269 } 00270 } 00271 00272 nodes = nodes->GetNextNode(); 00273 00274 } // end loop over nodes 00275 00276 m_logger << INFO << "Job '" << jobName << "' configured" << SLogger::endmsg; 00277 00278 } else { 00279 SError error( SError::StopExecution ); 00280 error << "XML root node " << rootNode->GetNodeName() 00281 << " has wrong format"; 00282 throw error; 00283 } 00284 00285 // --------------- end of xml interpretation 00286 00287 // Print how much time it took to initialise the analysis: 00288 timer.Stop(); 00289 m_logger << INFO << "Time needed for initialisation: " << std::setw( 6 ) 00290 << std::setprecision( 2 ) << timer.RealTime() << " s" 00291 << SLogger::endmsg; 00292 00293 // Print memory consumption after initialising the analysis: 00294 ProcInfo_t procinfo; 00295 gSystem->GetProcInfo( &procinfo ); 00296 m_logger << DEBUG << "Memory consumption after initialisation:" << SLogger::endmsg; 00297 m_logger.setf( std::ios::fixed ); 00298 m_logger << DEBUG << " Resident mem.: " << std::setw( 7 ) << procinfo.fMemResident 00299 << " kB; Virtual mem.: " << std::setw( 7 ) << procinfo.fMemVirtual 00300 << " kB" << SLogger::endmsg; 00301 00302 // set object status to be ready 00303 m_isInitialized = kTRUE; 00304 00305 return; 00306 } 00307 00319 void SCycleController::ExecuteAllCycles() throw( SError ) { 00320 00321 if( ! m_isInitialized ) { 00322 throw SError( "SCycleController is not initialized", 00323 SError::StopExecution ); 00324 } 00325 00326 m_logger << INFO << "Entering ExecuteAllCycles()" << SLogger::endmsg; 00327 00328 std::vector< ISCycleBase* >::const_iterator it = m_analysisCycles.begin(); 00329 for( ; it != m_analysisCycles.end(); ++it ) { 00330 this->ExecuteNextCycle(); 00331 } 00332 00333 return; 00334 } 00335 00344 void SCycleController::ExecuteNextCycle() throw( SError ) { 00345 00346 if( ! m_isInitialized ) { 00347 throw SError( "SCycleController is not initialized", 00348 SError::StopExecution ); 00349 } 00350 00351 // 00352 // Measure the total time needed for this cycle: 00353 // 00354 TStopwatch timer; 00355 timer.Start(); 00356 00357 // 00358 // Access the current cycle: 00359 // 00360 ISCycleBase* cycle = m_analysisCycles.at( m_curCycle ); 00361 TString cycleName = cycle->GetName(); 00362 00363 // 00364 // Create a copy of the cycle configuration, so that it can be given to PROOF: 00365 // 00366 SCycleConfig config = cycle->GetConfig(); 00367 config.SetName( SFrame::CycleConfigName ); 00368 config.ArrangeInputData(); // To handle multiple ID of the same type... 00369 config.ValidateInput(); // This is needed for the proper weighting... 00370 config.SetMsgLevel( SLogWriter::Instance()->GetMinType() ); // For the correct msg level... 00371 config.SetCycleName( cycle->GetName() ); // For technical reasons... 00372 cycle->SetConfig( config ); 00373 00374 m_logger << INFO << "Executing Cycle #" << m_curCycle << " ('" << cycleName << "') " 00375 << ( config.GetRunMode() == SCycleConfig::LOCAL ? "locally" : 00376 "on PROOF" ) 00377 << SLogger::endmsg; 00378 00379 // 00380 // Make some initialisation steps before starting the cycle: 00381 // 00382 if( config.GetRunMode() == SCycleConfig::PROOF ) { 00383 00384 // 00385 // Connect to the PROOF server: 00386 // 00387 InitProof( config.GetProofServer(), config.GetProofNodes() ); 00388 00389 // 00390 // Upload and compile all the packages specified in the configuration: 00391 // 00392 if( ! SProofManager::Instance()->IsConfigured( config.GetProofServer() ) ) { 00393 for( std::vector< TString >::const_iterator package = m_parPackages.begin(); 00394 package != m_parPackages.end(); ++package ) { 00395 00396 // Find the full path name of the package: 00397 TString pkg = SParLocator::Locate( *package ); 00398 if( pkg == "" ) continue; 00399 00400 REPORT_VERBOSE( "Uploading package: " << pkg ); 00401 if( m_proof->UploadPackage( pkg ) ) { 00402 REPORT_ERROR( "There was a problem with uploading " 00403 << *package ); 00404 throw SError( *package + " could not be uploaded to PROOF", 00405 SError::SkipCycle ); 00406 } 00407 00408 Ssiz_t slash_pos = pkg.Last( '/' ); 00409 pkg.Remove( 0, slash_pos + 1 ); 00410 if( pkg.EndsWith( ".par", TString::kIgnoreCase ) ) { 00411 pkg.Remove( pkg.Length() - 4, 4 ); 00412 } 00413 00414 m_logger << INFO << "Enabling package: " << pkg << SLogger::endmsg; 00415 if( m_proof->EnablePackage( pkg, kTRUE ) ) { 00416 REPORT_ERROR( "There was a problem with enabling " 00417 << *package ); 00418 throw SError( *package + " could not be enabled on PROOF", 00419 SError::SkipCycle ); 00420 } 00421 } 00422 } 00423 SProofManager::Instance()->SetConfigured( config.GetProofServer() ); 00424 00425 } 00426 00427 // Number of processed events: 00428 Long64_t procev = 0; 00429 // Number of skipped events: 00430 Long64_t skipev = 0; 00431 00432 // 00433 // The begin cycle function has to be called here by hand: 00434 // 00435 cycle->BeginCycle(); 00436 00437 // 00438 // Loop over all defined input data types: 00439 // 00440 for( SCycleConfig::id_type::const_iterator id = config.GetInputData().begin(); 00441 id != config.GetInputData().end(); ++id ) { 00442 00443 // 00444 // Decide how to write the output file at the end of processing this InputData. 00445 // The InputData objects should be arranged by their type at this point... 00446 // 00447 Bool_t updateOutput = kFALSE; 00448 SCycleConfig::id_type::const_iterator previous_id = id; 00449 if( previous_id == config.GetInputData().begin() ) { 00450 updateOutput = kFALSE; 00451 REPORT_VERBOSE( "New output file will be opened for ID type: " 00452 << id->GetType() ); 00453 } else { 00454 --previous_id; 00455 if( ( previous_id->GetType() == id->GetType() ) && 00456 ( previous_id->GetVersion() == id->GetVersion() ) ) { 00457 updateOutput = kTRUE; 00458 REPORT_VERBOSE( "Output file will be updated for ID type: " 00459 << id->GetType() ); 00460 } else { 00461 updateOutput = kFALSE; 00462 REPORT_VERBOSE( "New output file will be opened for ID type: " 00463 << id->GetType() ); 00464 } 00465 } 00466 00467 // 00468 // Each input data has to have at least one input tree: 00469 // 00470 if( ! id->HasInputTrees() ) { 00471 REPORT_ERROR( "No input trees defined in input data " << id->GetType() ); 00472 REPORT_ERROR( "Skipping it from processing" ); 00473 continue; 00474 } 00475 00476 // Find the first event-level input tree in the configuration: 00477 REPORT_VERBOSE( "Finding the name of the main event-level input TTree..." ); 00478 const char* treeName = 0; 00479 for( std::map< Int_t, std::vector< STree > >::const_iterator trees = 00480 id->GetTrees().begin(); trees != id->GetTrees().end(); ++trees ) { 00481 for( std::vector< STree >::const_iterator st = trees->second.begin(); 00482 st != trees->second.end(); ++st ) { 00483 if( ( st->type & STree::INPUT_TREE ) && 00484 ( st->type & STree::EVENT_TREE ) ) { 00485 treeName = st->treeName.Data(); 00486 break; 00487 } 00488 } 00489 } 00490 if( ! treeName ) { 00491 REPORT_ERROR( "Can't determine input TTree name for input data " 00492 << id->GetType() ); 00493 REPORT_ERROR( "Skipping it from processing" ); 00494 continue; 00495 } else { 00496 REPORT_VERBOSE( "The name of the main event-level input TTree is: " 00497 << treeName ); 00498 } 00499 00500 m_logger << INFO << "Processing input data type: " << id->GetType() 00501 << " version: " << id->GetVersion() << SLogger::endmsg; 00502 00503 // 00504 // Create a copy of the input data configuration, so that it can be 00505 // given to PROOF: 00506 // 00507 SInputData inputData = *id; 00508 inputData.SetName( SFrame::CurrentInputDataName ); 00509 00510 // 00511 // Retrieve the configuration object list from the cycle: 00512 // 00513 const TList& configList = cycle->GetConfigurationObjects(); 00514 00515 // 00516 // Calculate how many events to process: 00517 // 00518 const Long64_t evmax = ( id->GetNEventsMax() == -1 ? 00519 std::numeric_limits< Long64_t >::max() : 00520 id->GetNEventsMax() ); 00521 00522 // This will point to the created output objects: 00523 TList* outputs = 0; 00524 00525 // 00526 // The cycle can be run in two modes: 00527 // 00528 if( config.GetRunMode() == SCycleConfig::LOCAL ) { 00529 00530 if( id->GetDataSets().size() ) { 00531 REPORT_ERROR( "Can't use DataSet-s as input in LOCAL mode!" ); 00532 REPORT_ERROR( "Skipping InputData type: " << id->GetType() 00533 << " version: " << id->GetVersion() ); 00534 continue; 00535 } 00536 00537 // 00538 // Create a chain with all the specified input files: 00539 // 00540 REPORT_VERBOSE( "Creating TChain to run the cycle on..." ); 00541 TChain chain( treeName ); 00542 for( std::vector< SFile >::const_iterator file = id->GetSFileIn().begin(); 00543 file != id->GetSFileIn().end(); ++file ) { 00544 REPORT_VERBOSE( "Adding file: " << file->file ); 00545 chain.AddFile( file->file ); 00546 } 00547 00548 // 00549 // Give the configuration to the cycle by hand: 00550 // 00551 TList list; 00552 list.Add( &config ); 00553 list.Add( &inputData ); 00554 for( Int_t i = 0; i < configList.GetSize(); ++i ) { 00555 list.Add( configList.At( i ) ); 00556 } 00557 cycle->SetInputList( &list ); 00558 00559 // 00560 // Run the cycle: 00561 // 00562 chain.Process( cycle, "", evmax, id->GetNEventsSkip() ); 00563 outputs = cycle->GetOutputList(); 00564 00565 } else if( config.GetRunMode() == SCycleConfig::PROOF ) { 00566 00567 // 00568 // Check that the PROOF server is available and ready. For instance it's not 00569 // a good idea to send a job to a server that crashed on the previous 00570 // input data... 00571 // 00572 if( ! m_proof->IsValid() ) { 00573 REPORT_ERROR( "PROOF server doesn't seem to be available: " 00574 << m_proof->GetManager()->GetUrl() ); 00575 REPORT_ERROR( "Aborting execution of cycle!" ); 00576 break; 00577 } 00578 00579 // This object describes how to create the temporary PROOF output 00580 // files in the cycles: 00581 TNamed proofOutputFile( TString( SFrame::ProofOutputName ), 00582 ( config.GetProofWorkDir() == "" ? "./" : 00583 config.GetProofWorkDir() + "/" ) + 00584 cycle->GetName() + "-" + inputData.GetType() + 00585 "-" + inputData.GetVersion() + 00586 "-TempNTuple.root" ); 00587 00588 // 00589 // Clear the query results from memory (Thanks to Gerri!): 00590 // 00591 if( m_proof->GetQueryResults() ) { 00592 m_proof->GetQueryResults()->SetOwner( kTRUE ); 00593 m_proof->GetQueryResults()->Clear(); 00594 m_proof->GetQueryResults()->SetOwner( kFALSE ); 00595 } 00596 00597 // 00598 // Give the configuration to PROOF, and tweak it a little: 00599 // 00600 m_proof->ClearInput(); 00601 // Only output a maximum of 10 messages per node about memory usage per query: 00602 const Long64_t eventsPerNode = ( inputData.GetEventsTotal() / 00603 m_proof->GetParallel() ); 00604 m_proof->SetParameter( "PROOF_MemLogFreq", 00605 ( Long64_t ) ( eventsPerNode > 10000 ? 00606 ( eventsPerNode / 10 ) : 00607 1000 ) ); 00608 // Make sure that we can use as many workers per node as we want: 00609 m_proof->SetParameter( "PROOF_MaxSlavesPerNode", ( Long_t ) 9999999 ); 00610 // Configure the usage of TTreeCache on the cluster: 00611 if( config.GetUseTreeCache() ) { 00612 m_proof->SetParameter( "PROOF_UseTreeCache", ( Int_t ) 1 ); 00613 } else { 00614 m_proof->SetParameter( "PROOF_UseTreeCache", ( Int_t ) 0 ); 00615 } 00616 m_proof->SetParameter( "PROOF_CacheSize", config.GetCacheSize() ); 00617 // Configure whether the workers are allowed to read each others' files: 00618 if( config.GetProcessOnlyLocal() ) { 00619 m_proof->SetParameter( "PROOF_ForceLocal", ( Int_t ) 1 ); 00620 } else { 00621 m_proof->SetParameter( "PROOF_ForceLocal", ( Int_t ) 0 ); 00622 } 00623 00624 // Add the "input objects" to PROOF: 00625 m_proof->AddInput( &config ); 00626 m_proof->AddInput( &inputData ); 00627 m_proof->AddInput( &proofOutputFile ); 00628 for( Int_t i = 0; i < configList.GetSize(); ++i ) { 00629 m_proof->AddInput( configList.At( i ) ); 00630 } 00631 00632 if( id->GetDataSets().size() ) { 00633 00634 // Merge the dataset names in the way that PROOF expects them. This is 00635 // "<dataset 1>|<dataset 2>|...". Note that this only works in ROOT 00636 // versions newer than 5.27/02, but SInputData should take care about 00637 // removing multiple datasets when using an "old" ROOT release. 00638 TString dsets = ""; 00639 for( std::vector< SDataSet >::const_iterator ds = id->GetDataSets().begin(); 00640 ds != id->GetDataSets().end(); ++ds ) { 00641 00642 if( ds != id->GetDataSets().begin() ) { 00643 dsets += "|"; 00644 } 00645 dsets += ds->name + "#" + treeName; 00646 } 00647 00648 // Process the events: 00649 if( m_proof->Process( dsets, cycle->GetName(), "", evmax, 00650 id->GetNEventsSkip() ) == -1 ) { 00651 REPORT_ERROR( "There was an error processing:" ); 00652 REPORT_ERROR( " Cycle = " << cycle->GetName() ); 00653 REPORT_ERROR( " ID type = " << inputData.GetType() ); 00654 REPORT_ERROR( " ID version = " << inputData.GetVersion() ); 00655 REPORT_ERROR( "Stopping the execution of this cycle!" ); 00656 break; 00657 } 00658 00659 } else if( id->GetSFileIn().size() ) { 00660 00661 // 00662 // Check if the validation was skipped. If it was, then the SInputData objects 00663 // didn't create a TDSet object of its own. So we have to create a simple one 00664 // here. Otherwise just use the TDSet created by SInputData. 00665 // 00666 if( id->GetSkipValid() ) { 00667 00668 // Create the dataset object first: 00669 TChain chain( treeName ); 00670 for( std::vector< SFile >::const_iterator file = id->GetSFileIn().begin(); 00671 file != id->GetSFileIn().end(); ++file ) { 00672 chain.Add( file->file ); 00673 } 00674 TDSet set( chain ); 00675 00676 // Process the events: 00677 if( m_proof->Process( &set, cycle->GetName(), "", evmax, 00678 id->GetNEventsSkip() ) == -1 ) { 00679 REPORT_ERROR( "There was an error processing:" ); 00680 REPORT_ERROR( " Cycle = " << cycle->GetName() ); 00681 REPORT_ERROR( " ID type = " << inputData.GetType() ); 00682 REPORT_ERROR( " ID version = " << inputData.GetVersion() ); 00683 REPORT_ERROR( "Stopping the execution of this cycle!" ); 00684 break; 00685 } 00686 00687 } else { 00688 00689 // 00690 // Run the cycle on PROOF. Unfortunately the checking of the "successfullness" 00691 // of the PROOF job is not working too well... Even after a *lot* of error 00692 // messages the TProof::Process(...) command can still return a success code, 00693 // which can lead to nasty crashes... 00694 // 00695 if( m_proof->Process( id->GetDSet(), cycle->GetName(), "", evmax, 00696 id->GetNEventsSkip() ) == -1 ) { 00697 REPORT_ERROR( "There was an error processing:" ); 00698 REPORT_ERROR( " Cycle = " << cycle->GetName() ); 00699 REPORT_ERROR( " ID type = " << inputData.GetType() ); 00700 REPORT_ERROR( " ID version = " << inputData.GetVersion() ); 00701 REPORT_ERROR( "Stopping the execution of this cycle!" ); 00702 break; 00703 } 00704 } 00705 00706 } else { 00707 REPORT_ERROR( "Nothing was executed using PROOF!" ); 00708 } 00709 00710 // The missing file accounting only started in ROOT 5.28 as far as I can tell: 00711 #if ROOT_VERSION_CODE >= ROOT_VERSION( 5, 28, 00 ) 00712 // Only do this for non-Lite PROOF: 00713 if( ! m_proof->IsLite() ) { 00714 // Get the list of missing files: 00715 TFileCollection* missing = m_proof->GetMissingFiles(); 00716 if( missing ) { 00717 // Get the list of files: 00718 THashList* flist = missing->GetList(); 00719 if( flist->GetEntries() ) { 00720 m_logger << WARNING << "The following files were not processed:" << SLogger::endmsg; 00721 for( Int_t i = 0; i < flist->GetEntries(); ++i ) { 00722 TFileInfo* finfo = dynamic_cast< TFileInfo* >( flist->At( i ) ); 00723 if( ! finfo ) { 00724 REPORT_ERROR( "Missing file list not in expected format" ); 00725 continue; 00726 } 00727 m_logger << " " << finfo->GetCurrentUrl()->GetUrl() << SLogger::endmsg; 00728 } 00729 } 00730 // Remove the object: 00731 delete missing; 00732 missing = 0; 00733 } 00734 } 00735 #endif // ROOT_VERSION( 5, 28, 00 ) 00736 00737 outputs = m_proof->GetOutputList(); 00738 00739 } else { 00740 throw SError( "Running mode not recognised!", SError::SkipCycle ); 00741 } 00742 00743 if( ! outputs ) { 00744 REPORT_ERROR( "Cycle output could not be retrieved." ); 00745 REPORT_ERROR( "NOT writing the output of cycle \"" 00746 << cycle->GetName() << "\", ID \"" << inputData.GetType() 00747 << "\", Version \"" << inputData.GetVersion() << "\"" ); 00748 continue; 00749 } 00750 00751 // 00752 // Collect the statistics from this input data: 00753 // 00754 SCycleStatistics* stat = 00755 dynamic_cast< SCycleStatistics* >( outputs->FindObject( SFrame::RunStatisticsName ) ); 00756 if( stat ) { 00757 procev += stat->GetProcessedEvents(); 00758 skipev += stat->GetSkippedEvents(); 00759 } else { 00760 m_logger << WARNING << "Cycle statistics not received from: " 00761 << cycle->GetName() << SLogger::endmsg; 00762 m_logger << WARNING << "Printed statistics will not be correct!" 00763 << SLogger::endmsg; 00764 } 00765 00766 // 00767 // Write out the objects produced by the cycle: 00768 // 00769 TString outputFileName = config.GetOutputDirectory() + cycleName + "." + 00770 id->GetType() + "." + id->GetVersion() + config.GetPostFix() + ".root"; 00771 outputFileName.ReplaceAll( "::", "." ); 00772 WriteCycleOutput( outputs, outputFileName, 00773 config.GetStringConfig( &inputData ), 00774 updateOutput ); 00775 00776 // This cleanup is giving me endless trouble on the NYU Tier3 with 00777 // ROOT 5.28c. So, knowing no better solution, I just disabled it 00778 // on new ROOT versions for now... 00779 #if ROOT_VERSION_CODE < ROOT_VERSION( 5, 28, 0 ) 00780 outputs->SetOwner( kTRUE ); 00781 #endif 00782 outputs->Clear(); 00783 00784 } 00785 00786 // 00787 // The end cycle function has to be called here by hand: 00788 // 00789 cycle->EndCycle(); 00790 00791 // The cycle processing is done at this point: 00792 timer.Stop(); 00793 00794 m_logger << INFO << "Overall cycle statistics:" << SLogger::endmsg; 00795 m_logger.setf( std::ios::fixed ); 00796 m_logger << INFO << std::setw( 10 ) << std::setfill( ' ' ) << std::setprecision( 0 ) 00797 << procev << " Events - Real time " << std::setw( 6 ) << std::setprecision( 2 ) 00798 << timer.RealTime() << " s - " << std::setw( 5 ) 00799 << std::setprecision( 0 ) << ( procev / timer.RealTime() ) << " Hz | CPU time " 00800 << std::setw( 6 ) << std::setprecision( 2 ) << timer.CpuTime() << " s - " 00801 << std::setw( 5 ) << std::setprecision( 0 ) << ( procev / timer.CpuTime() ) 00802 << " Hz" << SLogger::endmsg; 00803 00804 ++m_curCycle; 00805 return; 00806 } 00807 00816 void SCycleController::AddAnalysisCycle( ISCycleBase* cycleAlg ) { 00817 00818 m_analysisCycles.push_back( cycleAlg ); 00819 return; 00820 } 00821 00825 void SCycleController::DeleteAllAnalysisCycles() { 00826 00827 m_logger << INFO << "Deleting all analysis cycle algorithms from memory" 00828 << SLogger::endmsg; 00829 00830 std::vector< ISCycleBase* >::const_iterator it = m_analysisCycles.begin(); 00831 for( ; it != m_analysisCycles.end(); ++it ) { 00832 delete ( *it ); 00833 } 00834 00835 m_analysisCycles.clear(); 00836 00837 return; 00838 } 00839 00840 void SCycleController::InitProof( const TString& server, const Int_t& nodes ) { 00841 00842 // 00843 // Open the connection: 00844 // 00845 m_logger << INFO << "Opening PROOF connection to: " << server 00846 << SLogger::endmsg; 00847 m_proof = SProofManager::Instance()->Open( server ); 00848 if( nodes > 0 ) m_proof->SetParallel( nodes ); 00849 00850 return; 00851 } 00852 00853 void SCycleController::ShutDownProof() { 00854 00855 // 00856 // Clean up the PROOF connection(s): 00857 // 00858 SProofManager::Instance()->Cleanup(); 00859 m_proof = 0; 00860 00861 return; 00862 } 00863 00864 void SCycleController::WriteCycleOutput( TList* olist, 00865 const TString& filename, 00866 const TString& config, 00867 Bool_t update ) const { 00868 00869 m_logger << INFO << "Writing output of \"" 00870 << m_analysisCycles.at( m_curCycle )->GetName() << "\" to: " 00871 << filename << SLogger::endmsg; 00872 00873 // 00874 // Open the output file: 00875 // 00876 TFile outputFile( filename , ( update ? "UPDATE" : "RECREATE" ) ); 00877 00878 // 00879 // List of files holding TTrees: 00880 // 00881 std::vector< TString > filesToMerge; 00882 00883 // 00884 // Merge the memory objects into the output file: 00885 // 00886 for( Int_t i = 0; i < olist->GetSize(); ++i ) { 00887 00888 outputFile.cd(); 00889 00890 if( dynamic_cast< SCycleOutput* >( olist->At( i ) ) ) { 00891 olist->At( i )->Write(); 00892 m_logger << DEBUG << "Written object: " << olist->At( i )->GetName() 00893 << SLogger::endmsg; 00894 } else if( dynamic_cast< TProofOutputFile* >( olist->At( i ) ) ) { 00895 TProofOutputFile* pfile = dynamic_cast< TProofOutputFile* >( olist->At( i ) ); 00896 filesToMerge.push_back( pfile->GetOutputFileName() ); 00897 } else if ( dynamic_cast< SOutputFile* >( olist->At( i ) ) ) { 00898 SOutputFile* sfile = dynamic_cast< SOutputFile* >( olist->At( i ) ); 00899 filesToMerge.push_back( sfile->GetFileName() ); 00900 } else { 00901 /* 00902 TDirectory* proofdir = outputFile.GetDirectory( "PROOF" ); 00903 if( ! proofdir ) { 00904 proofdir = outputFile.mkdir( "PROOF", "PROOF related objects" ); 00905 } 00906 proofdir->cd(); 00907 olist->At( i )->Write(); 00908 */ 00909 } 00910 } 00911 00912 // 00913 // Add the cycle configuration as metadata to the output file: 00914 // 00915 if( ! update ) { 00916 // Make a directory for all SFrame related metadata. Might want to 00917 // add other metadata types as well to the output files later on. 00918 TDirectory* sframeDir = outputFile.GetDirectory( "SFrame" ); 00919 if( ! sframeDir ) { 00920 sframeDir = outputFile.mkdir( "SFrame" ); 00921 } 00922 sframeDir->cd(); 00923 00924 // Create a TObjString out of the cycle configuration, and write it 00925 // out: 00926 TObjString configString( config ); 00927 configString.Write( "CycleConfiguration" ); 00928 } 00929 00930 // 00931 // Write and close the output file: 00932 // 00933 outputFile.Write(); 00934 outputFile.Close(); 00935 00936 // 00937 // Merge the TTree contents of the temporary files into our output file: 00938 // 00939 if( filesToMerge.size() ) { 00940 00941 m_logger << DEBUG << "Merging disk-resident TTrees into \"" 00942 << filename << "\"" << SLogger::endmsg; 00943 00944 // Merge the file(s) into the output file using SFileMerger: 00945 SFileMerger merger; 00946 for( std::vector< TString >::const_iterator mfile = filesToMerge.begin(); 00947 mfile != filesToMerge.end(); ++mfile ) { 00948 if( ! merger.AddFile( *mfile ) ) { 00949 REPORT_ERROR( "Failed to add file \"" << *mfile 00950 << "\" to the merger" ); 00951 continue; 00952 } 00953 } 00954 if( ! merger.OutputFile( filename, "UPDATE" ) ) { 00955 REPORT_ERROR( "Failed to specify \"" << filename << "\" as the output " 00956 << "file name for the merging" ); 00957 } else { 00958 if( ! merger.Merge() ) { 00959 REPORT_ERROR( "Failed to execute the file merging" ); 00960 } 00961 } 00962 00963 // Remove the temporary files: 00964 for( std::vector< TString >::const_iterator mfile = filesToMerge.begin(); 00965 mfile != filesToMerge.end(); ++mfile ) { 00966 gSystem->Unlink( *mfile ); 00967 // This is not too nice, but for LOCAL running we also have to remove 00968 // the temporary directory that the file was in: 00969 if( mfile->Contains( SFrame::ProofOutputFileName ) ) { 00970 TString dirname = gSystem->DirName( *mfile ); 00971 if( dirname != "." ) { 00972 gSystem->Unlink( dirname ); 00973 } 00974 } 00975 } 00976 } 00977 00978 return; 00979 }