SFrame 3.6
core/src/SCycleController.cxx
Go to the documentation of this file.
00001 // $Id: SCycleController.cxx 335 2012-11-21 14:11:47Z krasznaa $
00002 /***************************************************************************
00003  * @Project: SFrame - ROOT-based analysis framework for ATLAS
00004  * @Package: Core
00005  *
00006  * @author Stefan Ask       <Stefan.Ask@cern.ch>           - Manchester
00007  * @author David Berge      <David.Berge@cern.ch>          - CERN
00008  * @author Johannes Haller  <Johannes.Haller@cern.ch>      - Hamburg
00009  * @author A. Krasznahorkay <Attila.Krasznahorkay@cern.ch> - CERN/Debrecen
00010  *
00011  ***************************************************************************/
00012 
00013 // STL include(s):
00014 #include <iomanip>
00015 #include <iostream>
00016 #include <sstream>
00017 #include <cstdlib>
00018 #include <limits>
00019 
00020 // ROOT include(s):
00021 #include <TDOMParser.h>
00022 #include <TXMLNode.h>
00023 #include <TXMLDocument.h>
00024 #include <TXMLAttr.h>
00025 #include <TStopwatch.h>
00026 #include <TSystem.h>
00027 #include <TClass.h>
00028 #include <TROOT.h>
00029 #include <TPython.h>
00030 #include <TChain.h>
00031 #include <TList.h>
00032 #include <TFile.h>
00033 #include <TProof.h>
00034 #include <TProofOutputFile.h>
00035 #include <TDSet.h>
00036 #include <TFileCollection.h>
00037 #include <THashList.h>
00038 #include <TFileInfo.h>
00039 #include <TObjString.h>
00040 
00041 // Local include(s):
00042 #include "../include/SCycleController.h"
00043 #include "../include/ISCycleBase.h"
00044 #include "../include/SLogWriter.h"
00045 #include "../include/SConstants.h"
00046 #include "../include/SParLocator.h"
00047 #include "../include/SCycleStatistics.h"
00048 #include "../include/SFileMerger.h"
00049 #include "../include/SOutputFile.h"
00050 #include "../include/SCycleConfig.h"
00051 #include "../include/SCycleOutput.h"
00052 #include "../include/SProofManager.h"
00053 
00062 SCycleController::SCycleController( const TString& xmlConfigFile )
00063    : m_curCycle( 0 ), m_isInitialized( kFALSE ), m_xmlConfigFile( xmlConfigFile ),
00064      m_proof( 0 ), m_logger( "SCycleController" ) {
00065 
00066 }
00067 
00074 SCycleController::~SCycleController() {
00075 
00076    std::vector< ISCycleBase* >::const_iterator it = m_analysisCycles.begin();
00077    for( ; it != m_analysisCycles.end(); ++it) {
00078       delete ( *it );
00079    }
00080 
00081    ShutDownProof();
00082 }
00083 
00092 void SCycleController::Initialize() throw( SError ) {
00093 
00094    m_logger << INFO << "Initializing" << SLogger::endmsg;
00095 
00096    // Just for kicks, lets measure the time it needs to initialise the
00097    // analysis:
00098    TStopwatch timer;
00099    timer.Start();
00100 
00101    // first clean up everything in case this is called multiple times
00102    m_curCycle = 0;
00103    this->DeleteAllAnalysisCycles();
00104    m_parPackages.clear();
00105 
00106    // --------------- xml read
00107    m_logger << INFO << "read xml file: '" << m_xmlConfigFile << "'" << SLogger::endmsg;
00108 
00109    TDOMParser xmlparser;
00110 
00111    // This is a new feature only available in the newest ROOT
00112    // nightlies. It makes it possible to have the input file
00113    // definitions in external XML files that are imported in
00114    // the main configuration XML file. It's conventient when
00115    // using a lot of the same input files in different cycles.
00116 #if ROOT_VERSION_CODE >= ROOT_VERSION( 5, 17, 4 )
00117    xmlparser.SetReplaceEntities( kTRUE );
00118 #endif
00119 
00120    Int_t parseError = xmlparser.ParseFile( m_xmlConfigFile );
00121    if( parseError ) {
00122       SError error( SError::StopExecution );
00123       error << "Loading of xml document \"" << m_xmlConfigFile
00124             << "\" failed";
00125       throw error;
00126    }
00127 
00128    // --------------- interpret xml file
00129 
00130    // read cycles and libraries
00131 
00132    TXMLDocument* xmldoc = xmlparser.GetXMLDocument();
00133    TXMLNode* rootNode = xmldoc->GetRootNode();
00134 
00135    if( rootNode->GetNodeName() == TString( "JobConfiguration" ) ) {
00136       std::string jobName = "";
00137       std::string outputLevelString = "";
00138       TListIter attribIt( rootNode->GetAttributes() );
00139       TXMLAttr* curAttr( 0 );
00140       while ( (curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) {
00141          if( curAttr->GetName() == TString( "JobName" ) )
00142             jobName = curAttr->GetValue();
00143          else if( curAttr->GetName() == TString( "OutputLevel" ) )
00144             outputLevelString = curAttr->GetValue();
00145       }
00146       SMsgType type = INFO;
00147       if     ( outputLevelString == "VERBOSE" ) type = VERBOSE;
00148       else if( outputLevelString == "DEBUG" )   type = DEBUG;
00149       else if( outputLevelString == "INFO" )    type = INFO;
00150       else if( outputLevelString == "WARNING" ) type = WARNING;
00151       else if( outputLevelString == "ERROR" )   type = ERROR;
00152       else if( outputLevelString == "FATAL" )   type = FATAL;
00153       else if( outputLevelString == "ALWAYS" )  type = ALWAYS;
00154       else {
00155          m_logger << WARNING << "Message output level ("
00156                   << outputLevelString << ") not recognized"
00157                   << SLogger::endmsg;
00158       }
00159       SLogWriter::Instance()->SetMinType( type );
00160 
00161       TXMLNode* nodes = rootNode->GetChildren();
00162 
00163       // now loop over nodes
00164       while( nodes != 0 ) {
00165          if( ! nodes->HasAttributes() ) {
00166             nodes = nodes->GetNextNode();
00167             continue;
00168          }
00169 
00170          try { // For catching "cycle level" problems...
00171 
00172             if( nodes->GetNodeName() == TString( "Cycle" ) ) {
00173 
00174                std::string cycleName = "";
00175 
00176                attribIt = nodes->GetAttributes();
00177                curAttr = 0;
00178                while ( ( curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) {
00179                   if( curAttr->GetName() == TString( "Name" ) )
00180                      cycleName = curAttr->GetValue();
00181                }
00182 
00183                TClass* cycleClass = gROOT->GetClass( cycleName.c_str(), true );
00184                if( ! cycleClass || ! cycleClass->InheritsFrom( "ISCycleBase" ) ) {
00185                   SError error( SError::SkipCycle );
00186                   error << "Loading of class \"" << cycleName << "\" failed";
00187                   throw error;
00188                }
00189 
00190                ISCycleBase* cycle = reinterpret_cast< ISCycleBase* >( cycleClass->New() );
00191 
00192                m_logger << INFO << "Created cycle '" << cycleName << "'"
00193                         << SLogger::endmsg;
00194 
00195                cycle->Initialize( nodes );
00196                this->AddAnalysisCycle( cycle );
00197 
00198             } else if( nodes->GetNodeName() == TString( "Library" ) ) {
00199 
00200                std::string libraryName = "";
00201                attribIt = nodes->GetAttributes();
00202                curAttr = 0;
00203                while ( ( curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) {
00204                   if( curAttr->GetName() == TString( "Name" ) )
00205                      libraryName = curAttr->GetValue();
00206                }
00207                REPORT_VERBOSE( "Trying to load library \"" << libraryName << "\"" );
00208 
00209                int ret = 0;
00210                if( ( ret = gSystem->Load( libraryName.c_str() ) ) >= 0 ) {
00211                   m_logger << DEBUG << "Library loaded: \"" << libraryName << "\"" 
00212                            << SLogger::endmsg;
00213                } else {
00214                   SError error( SError::StopExecution );
00215                   error << "Library failed to load: \"" << libraryName
00216                         << "\"\nRet. Val.: " << ret;
00217                   throw error;
00218                }
00219 
00220             } else if( nodes->GetNodeName() == TString( "PyLibrary" ) ) {
00221 
00222                std::string libraryName = "";
00223                attribIt = nodes->GetAttributes();
00224                curAttr = 0;
00225                while ( ( curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) {
00226                   if( curAttr->GetName() == TString( "Name" ) )
00227                      libraryName = curAttr->GetValue();
00228                }
00229                REPORT_VERBOSE( "Trying to load python library \"" << libraryName
00230                                << "\"" );
00231 
00232                std::ostringstream command;
00233                command << "import " << libraryName;
00234 
00235                TPython::Exec( command.str().c_str() );
00236 
00237             } else if( nodes->GetNodeName() == TString( "Package" ) ) {
00238 
00239                TString packageName;
00240                attribIt = nodes->GetAttributes();
00241                curAttr = 0;
00242                while ( ( curAttr = dynamic_cast< TXMLAttr* >( attribIt() ) ) != 0 ) {
00243                   if( curAttr->GetName() == TString( "Name" ) )
00244                      packageName = curAttr->GetValue();
00245                }
00246                m_logger << DEBUG << "Using PROOF ARchive package: " << packageName
00247                         << SLogger::endmsg;
00248 
00249                m_parPackages.push_back( packageName );
00250 
00251             }
00252 
00253          } catch( const SError& error ) {
00254             //
00255             // This is where I catch "cycle level" problems:
00256             //
00257             if( error.request() <= SError::SkipCycle ) {
00258                // If just this cycle has to be skipped:
00259                REPORT_ERROR( "Exception caught while processing node: "
00260                              << nodes->GetNodeName() );
00261                REPORT_ERROR( "Message: " << error.what() );
00262                REPORT_ERROR( "--> Skipping cycle!" );
00263 
00264                nodes = nodes->GetNextNode();
00265                continue;
00266             } else {
00267                // If this is more serious:
00268                throw;
00269             }
00270          }
00271 
00272          nodes = nodes->GetNextNode();
00273 
00274       } // end loop over nodes
00275 
00276       m_logger << INFO << "Job '" << jobName << "' configured" << SLogger::endmsg;
00277 
00278    } else {
00279       SError error( SError::StopExecution );
00280       error << "XML root node " << rootNode->GetNodeName()
00281             << " has wrong format";
00282       throw error;
00283    }
00284 
00285    // --------------- end of xml interpretation
00286 
00287    // Print how much time it took to initialise the analysis:
00288    timer.Stop();
00289    m_logger << INFO << "Time needed for initialisation: " << std::setw( 6 )
00290             << std::setprecision( 2 ) << timer.RealTime() << " s"
00291             << SLogger::endmsg;
00292 
00293    // Print memory consumption after initialising the analysis:
00294    ProcInfo_t procinfo;
00295    gSystem->GetProcInfo( &procinfo );
00296    m_logger << DEBUG << "Memory consumption after initialisation:" << SLogger::endmsg;
00297    m_logger.setf( std::ios::fixed );
00298    m_logger << DEBUG << "  Resident mem.: " << std::setw( 7 ) << procinfo.fMemResident
00299             << " kB; Virtual mem.: " << std::setw( 7 ) << procinfo.fMemVirtual
00300             << " kB" << SLogger::endmsg;
00301 
00302    // set object status to be ready
00303    m_isInitialized = kTRUE;
00304 
00305    return;
00306 }
00307 
00319 void SCycleController::ExecuteAllCycles() throw( SError ) {
00320 
00321    if( ! m_isInitialized ) {
00322       throw SError( "SCycleController is not initialized",
00323                     SError::StopExecution );
00324    }
00325 
00326    m_logger << INFO << "Entering ExecuteAllCycles()" << SLogger::endmsg;
00327 
00328    std::vector< ISCycleBase* >::const_iterator it = m_analysisCycles.begin();
00329    for( ; it != m_analysisCycles.end(); ++it ) {
00330       this->ExecuteNextCycle();
00331    }
00332 
00333    return;
00334 }
00335 
00344 void SCycleController::ExecuteNextCycle() throw( SError ) {
00345 
00346    if( ! m_isInitialized ) {
00347       throw SError( "SCycleController is not initialized",
00348                     SError::StopExecution );
00349    }
00350 
00351    //
00352    // Measure the total time needed for this cycle:
00353    //
00354    TStopwatch timer;
00355    timer.Start();
00356 
00357    //
00358    // Access the current cycle:
00359    //
00360    ISCycleBase* cycle     = m_analysisCycles.at( m_curCycle );
00361    TString      cycleName = cycle->GetName();
00362 
00363    //
00364    // Create a copy of the cycle configuration, so that it can be given to PROOF:
00365    //
00366    SCycleConfig config = cycle->GetConfig();
00367    config.SetName( SFrame::CycleConfigName );
00368    config.ArrangeInputData(); // To handle multiple ID of the same type...
00369    config.ValidateInput(); // This is needed for the proper weighting...
00370    config.SetMsgLevel( SLogWriter::Instance()->GetMinType() ); // For the correct msg level...
00371    config.SetCycleName( cycle->GetName() ); // For technical reasons...
00372    cycle->SetConfig( config );
00373 
00374    m_logger << INFO << "Executing Cycle #" << m_curCycle << " ('" << cycleName << "') "
00375             << ( config.GetRunMode() == SCycleConfig::LOCAL ? "locally" :
00376                  "on PROOF" )
00377             << SLogger::endmsg;
00378 
00379    //
00380    // Make some initialisation steps before starting the cycle:
00381    //
00382    if( config.GetRunMode() == SCycleConfig::PROOF ) {
00383 
00384       //
00385       // Connect to the PROOF server:
00386       //
00387       InitProof( config.GetProofServer(), config.GetProofNodes() );
00388 
00389       //
00390       // Upload and compile all the packages specified in the configuration:
00391       //
00392       if( ! SProofManager::Instance()->IsConfigured( config.GetProofServer() ) ) {
00393          for( std::vector< TString >::const_iterator package = m_parPackages.begin();
00394               package != m_parPackages.end(); ++package ) {
00395 
00396             // Find the full path name of the package:
00397             TString pkg = SParLocator::Locate( *package );
00398             if( pkg == "" ) continue;
00399 
00400             REPORT_VERBOSE( "Uploading package: " << pkg );
00401             if( m_proof->UploadPackage( pkg ) ) {
00402                REPORT_ERROR( "There was a problem with uploading "
00403                              << *package );
00404                throw SError( *package + " could not be uploaded to PROOF",
00405                              SError::SkipCycle );
00406             }
00407 
00408             Ssiz_t slash_pos = pkg.Last( '/' );
00409             pkg.Remove( 0, slash_pos + 1 );
00410             if( pkg.EndsWith( ".par", TString::kIgnoreCase ) ) {
00411                pkg.Remove( pkg.Length() - 4, 4 );
00412             }
00413 
00414             m_logger << INFO << "Enabling package: " << pkg << SLogger::endmsg;
00415             if( m_proof->EnablePackage( pkg, kTRUE ) ) {
00416                REPORT_ERROR( "There was a problem with enabling "
00417                              << *package );
00418                throw SError( *package + " could not be enabled on PROOF",
00419                              SError::SkipCycle );
00420             }
00421          }
00422       }
00423       SProofManager::Instance()->SetConfigured( config.GetProofServer() );
00424 
00425    }
00426 
00427    // Number of processed events:
00428    Long64_t procev = 0;
00429    // Number of skipped events:
00430    Long64_t skipev = 0;
00431 
00432    //
00433    // The begin cycle function has to be called here by hand:
00434    //
00435    cycle->BeginCycle();
00436 
00437    //
00438    // Loop over all defined input data types:
00439    //
00440    for( SCycleConfig::id_type::const_iterator id = config.GetInputData().begin();
00441         id != config.GetInputData().end(); ++id ) {
00442 
00443       //
00444       // Decide how to write the output file at the end of processing this InputData.
00445       // The InputData objects should be arranged by their type at this point...
00446       //
00447       Bool_t updateOutput = kFALSE;
00448       SCycleConfig::id_type::const_iterator previous_id = id;
00449       if( previous_id == config.GetInputData().begin() ) {
00450          updateOutput = kFALSE;
00451          REPORT_VERBOSE( "New output file will be opened for ID type: "
00452                          << id->GetType() );
00453       } else {
00454          --previous_id;
00455          if( ( previous_id->GetType() == id->GetType() ) &&
00456              ( previous_id->GetVersion() == id->GetVersion() ) ) {
00457             updateOutput = kTRUE;
00458             REPORT_VERBOSE( "Output file will be updated for ID type: "
00459                             << id->GetType() );
00460          } else {
00461             updateOutput = kFALSE;
00462             REPORT_VERBOSE( "New output file will be opened for ID type: "
00463                             << id->GetType() );
00464          }
00465       }
00466 
00467       //
00468       // Each input data has to have at least one input tree:
00469       //
00470       if( ! id->HasInputTrees() ) {
00471          REPORT_ERROR( "No input trees defined in input data " << id->GetType() );
00472          REPORT_ERROR( "Skipping it from processing" );
00473          continue;
00474       }
00475 
00476       // Find the first event-level input tree in the configuration:
00477       REPORT_VERBOSE( "Finding the name of the main event-level input TTree..." );
00478       const char* treeName = 0;
00479       for( std::map< Int_t, std::vector< STree > >::const_iterator trees =
00480               id->GetTrees().begin(); trees != id->GetTrees().end(); ++trees ) {
00481          for( std::vector< STree >::const_iterator st = trees->second.begin();
00482               st != trees->second.end(); ++st ) {
00483             if( ( st->type & STree::INPUT_TREE ) &&
00484                 ( st->type & STree::EVENT_TREE ) ) {
00485                treeName = st->treeName.Data();
00486                break;
00487             }
00488          }
00489       }
00490       if( ! treeName ) {
00491          REPORT_ERROR( "Can't determine input TTree name for input data "
00492                        << id->GetType() );
00493          REPORT_ERROR( "Skipping it from processing" );
00494          continue;
00495       } else {
00496          REPORT_VERBOSE( "The name of the main event-level input TTree is: "
00497                          << treeName );
00498       }
00499 
00500       m_logger << INFO << "Processing input data type: " << id->GetType()
00501                << " version: " << id->GetVersion() << SLogger::endmsg;
00502 
00503       //
00504       // Create a copy of the input data configuration, so that it can be
00505       // given to PROOF:
00506       //
00507       SInputData inputData = *id;
00508       inputData.SetName( SFrame::CurrentInputDataName );
00509 
00510       //
00511       // Retrieve the configuration object list from the cycle:
00512       //
00513       const TList& configList = cycle->GetConfigurationObjects();
00514 
00515       //
00516       // Calculate how many events to process:
00517       //
00518       const Long64_t evmax = ( id->GetNEventsMax() == -1 ?
00519                                std::numeric_limits< Long64_t >::max() :
00520                                id->GetNEventsMax() );
00521 
00522       // This will point to the created output objects:
00523       TList* outputs = 0;
00524 
00525       //
00526       // The cycle can be run in two modes:
00527       //
00528       if( config.GetRunMode() == SCycleConfig::LOCAL ) {
00529 
00530          if( id->GetDataSets().size() ) {
00531             REPORT_ERROR( "Can't use DataSet-s as input in LOCAL mode!" );
00532             REPORT_ERROR( "Skipping InputData type: " << id->GetType()
00533                           << " version: " << id->GetVersion() );
00534             continue;
00535          }
00536 
00537          //
00538          // Create a chain with all the specified input files:
00539          //
00540          REPORT_VERBOSE( "Creating TChain to run the cycle on..." );
00541          TChain chain( treeName );
00542          for( std::vector< SFile >::const_iterator file = id->GetSFileIn().begin();
00543               file != id->GetSFileIn().end(); ++file ) {
00544             REPORT_VERBOSE( "Adding file: " << file->file );
00545             chain.AddFile( file->file );
00546          }
00547 
00548          //
00549          // Give the configuration to the cycle by hand:
00550          //
00551          TList list;
00552          list.Add( &config );
00553          list.Add( &inputData );
00554          for( Int_t i = 0; i < configList.GetSize(); ++i ) {
00555             list.Add( configList.At( i ) );
00556          }
00557          cycle->SetInputList( &list );
00558 
00559          //
00560          // Run the cycle:
00561          //
00562          chain.Process( cycle, "", evmax, id->GetNEventsSkip() );
00563          outputs = cycle->GetOutputList();
00564 
00565       } else if( config.GetRunMode() == SCycleConfig::PROOF ) {
00566 
00567          //
00568          // Check that the PROOF server is available and ready. For instance it's not
00569          // a good idea to send a job to a server that crashed on the previous
00570          // input data...
00571          //
00572          if( ! m_proof->IsValid() ) {
00573             REPORT_ERROR( "PROOF server doesn't seem to be available: "
00574                           << m_proof->GetManager()->GetUrl() );
00575             REPORT_ERROR( "Aborting execution of cycle!" );
00576             break;
00577          }
00578 
00579          // This object describes how to create the temporary PROOF output
00580          // files in the cycles:
00581          TNamed proofOutputFile( TString( SFrame::ProofOutputName ),
00582                                  ( config.GetProofWorkDir() == "" ? "./" :
00583                                    config.GetProofWorkDir() + "/" ) +
00584                                  cycle->GetName() + "-" + inputData.GetType() +
00585                                  "-" + inputData.GetVersion() +
00586                                  "-TempNTuple.root" );
00587 
00588          //
00589          // Clear the query results from memory (Thanks to Gerri!):
00590          //
00591          if( m_proof->GetQueryResults() ) {
00592             m_proof->GetQueryResults()->SetOwner( kTRUE );
00593             m_proof->GetQueryResults()->Clear();
00594             m_proof->GetQueryResults()->SetOwner( kFALSE );
00595          }
00596 
00597          //
00598          // Give the configuration to PROOF, and tweak it a little:
00599          //
00600          m_proof->ClearInput();
00601          // Only output a maximum of 10 messages per node about memory usage per query:
00602          const Long64_t eventsPerNode = ( inputData.GetEventsTotal() /
00603                                           m_proof->GetParallel() );
00604          m_proof->SetParameter( "PROOF_MemLogFreq",
00605                                 ( Long64_t ) ( eventsPerNode > 10000 ?
00606                                                ( eventsPerNode / 10 ) :
00607                                                1000 ) );
00608          // Make sure that we can use as many workers per node as we want:
00609          m_proof->SetParameter( "PROOF_MaxSlavesPerNode", ( Long_t ) 9999999 );
00610          // Configure the usage of TTreeCache on the cluster:
00611          if( config.GetUseTreeCache() ) {
00612             m_proof->SetParameter( "PROOF_UseTreeCache", ( Int_t ) 1 );
00613          } else {
00614             m_proof->SetParameter( "PROOF_UseTreeCache", ( Int_t ) 0 );
00615          }
00616          m_proof->SetParameter( "PROOF_CacheSize", config.GetCacheSize() );
00617          // Configure whether the workers are allowed to read each others' files:
00618          if( config.GetProcessOnlyLocal() ) {
00619             m_proof->SetParameter( "PROOF_ForceLocal", ( Int_t ) 1 );
00620          } else {
00621             m_proof->SetParameter( "PROOF_ForceLocal", ( Int_t ) 0 );
00622          }
00623 
00624          // Add the "input objects" to PROOF:
00625          m_proof->AddInput( &config );
00626          m_proof->AddInput( &inputData );
00627          m_proof->AddInput( &proofOutputFile );
00628          for( Int_t i = 0; i < configList.GetSize(); ++i ) {
00629             m_proof->AddInput( configList.At( i ) );
00630          }
00631 
00632          if( id->GetDataSets().size() ) {
00633 
00634             // Merge the dataset names in the way that PROOF expects them. This is
00635             // "<dataset 1>|<dataset 2>|...". Note that this only works in ROOT
00636             // versions newer than 5.27/02, but SInputData should take care about
00637             // removing multiple datasets when using an "old" ROOT release.
00638             TString dsets = "";
00639             for( std::vector< SDataSet >::const_iterator ds = id->GetDataSets().begin();
00640                  ds != id->GetDataSets().end(); ++ds ) {
00641 
00642                if( ds != id->GetDataSets().begin() ) {
00643                   dsets += "|";
00644                }
00645                dsets += ds->name + "#" + treeName;
00646             }
00647 
00648             // Process the events:
00649             if( m_proof->Process( dsets, cycle->GetName(), "", evmax,
00650                                   id->GetNEventsSkip() ) == -1 ) {
00651                REPORT_ERROR( "There was an error processing:" );
00652                REPORT_ERROR( "  Cycle      = " << cycle->GetName() );
00653                REPORT_ERROR( "  ID type    = " << inputData.GetType() );
00654                REPORT_ERROR( "  ID version = " << inputData.GetVersion() );
00655                REPORT_ERROR( "Stopping the execution of this cycle!" );
00656                break;
00657             }
00658 
00659          } else if( id->GetSFileIn().size() ) {
00660 
00661             //
00662             // Check if the validation was skipped. If it was, then the SInputData objects
00663             // didn't create a TDSet object of its own. So we have to create a simple one
00664             // here. Otherwise just use the TDSet created by SInputData.
00665             //
00666             if( id->GetSkipValid() ) {
00667 
00668                // Create the dataset object first:
00669                TChain chain( treeName );
00670                for( std::vector< SFile >::const_iterator file = id->GetSFileIn().begin();
00671                     file != id->GetSFileIn().end(); ++file ) {
00672                   chain.Add( file->file );
00673                }
00674                TDSet set( chain );
00675 
00676                // Process the events:
00677                if( m_proof->Process( &set, cycle->GetName(), "", evmax,
00678                                      id->GetNEventsSkip() ) == -1 ) {
00679                   REPORT_ERROR( "There was an error processing:" );
00680                   REPORT_ERROR( "  Cycle      = " << cycle->GetName() );
00681                   REPORT_ERROR( "  ID type    = " << inputData.GetType() );
00682                   REPORT_ERROR( "  ID version = " << inputData.GetVersion() );
00683                   REPORT_ERROR( "Stopping the execution of this cycle!" );
00684                   break;
00685                }
00686 
00687             } else {
00688 
00689                //
00690                // Run the cycle on PROOF. Unfortunately the checking of the "successfullness"
00691                // of the PROOF job is not working too well... Even after a *lot* of error
00692                // messages the TProof::Process(...) command can still return a success code,
00693                // which can lead to nasty crashes...
00694                //
00695                if( m_proof->Process( id->GetDSet(), cycle->GetName(), "", evmax,
00696                                      id->GetNEventsSkip() ) == -1 ) {
00697                   REPORT_ERROR( "There was an error processing:" );
00698                   REPORT_ERROR( "  Cycle      = " << cycle->GetName() );
00699                   REPORT_ERROR( "  ID type    = " << inputData.GetType() );
00700                   REPORT_ERROR( "  ID version = " << inputData.GetVersion() );
00701                   REPORT_ERROR( "Stopping the execution of this cycle!" );
00702                   break;
00703                }
00704             }
00705 
00706          } else {
00707             REPORT_ERROR( "Nothing was executed using PROOF!" );
00708          }
00709 
00710          // The missing file accounting only started in ROOT 5.28 as far as I can tell:
00711 #if ROOT_VERSION_CODE >= ROOT_VERSION( 5, 28, 00 )
00712          // Only do this for non-Lite PROOF:
00713          if( ! m_proof->IsLite() ) {
00714             // Get the list of missing files:
00715             TFileCollection* missing = m_proof->GetMissingFiles();
00716             if( missing ) {
00717                // Get the list of files:
00718                THashList* flist = missing->GetList();
00719                if( flist->GetEntries() ) {
00720                   m_logger << WARNING << "The following files were not processed:" << SLogger::endmsg;
00721                   for( Int_t i = 0; i < flist->GetEntries(); ++i ) {
00722                      TFileInfo* finfo = dynamic_cast< TFileInfo* >( flist->At( i ) );
00723                      if( ! finfo ) {
00724                         REPORT_ERROR( "Missing file list not in expected format" );
00725                         continue;
00726                      }
00727                      m_logger << "    " << finfo->GetCurrentUrl()->GetUrl() << SLogger::endmsg;
00728                   }
00729                }
00730                // Remove the object:
00731                delete missing;
00732                missing = 0;
00733             }
00734          }
00735 #endif // ROOT_VERSION( 5, 28, 00 )
00736 
00737          outputs = m_proof->GetOutputList();
00738 
00739       } else {
00740          throw SError( "Running mode not recognised!", SError::SkipCycle );
00741       }
00742 
00743       if( ! outputs ) {
00744          REPORT_ERROR( "Cycle output could not be retrieved." );
00745          REPORT_ERROR( "NOT writing the output of cycle \""
00746                        << cycle->GetName() << "\", ID \"" << inputData.GetType()
00747                        << "\", Version \"" << inputData.GetVersion() << "\"" );
00748          continue;
00749       }
00750 
00751       //
00752       // Collect the statistics from this input data:
00753       //
00754       SCycleStatistics* stat =
00755          dynamic_cast< SCycleStatistics* >( outputs->FindObject( SFrame::RunStatisticsName ) );
00756       if( stat ) {
00757          procev += stat->GetProcessedEvents();
00758          skipev += stat->GetSkippedEvents();
00759       } else {
00760          m_logger << WARNING << "Cycle statistics not received from: "
00761                   << cycle->GetName() << SLogger::endmsg;
00762          m_logger << WARNING << "Printed statistics will not be correct!"
00763                   << SLogger::endmsg;
00764       }
00765 
00766       //
00767       // Write out the objects produced by the cycle:
00768       //
00769       TString outputFileName = config.GetOutputDirectory() + cycleName + "." +
00770          id->GetType() + "." + id->GetVersion() + config.GetPostFix() + ".root";
00771       outputFileName.ReplaceAll( "::", "." );
00772       WriteCycleOutput( outputs, outputFileName,
00773                         config.GetStringConfig( &inputData ),
00774                         updateOutput );
00775 
00776       // This cleanup is giving me endless trouble on the NYU Tier3 with
00777       // ROOT 5.28c. So, knowing no better solution, I just disabled it
00778       // on new ROOT versions for now...
00779 #if ROOT_VERSION_CODE < ROOT_VERSION( 5, 28, 0 )
00780       outputs->SetOwner( kTRUE );
00781 #endif
00782       outputs->Clear();
00783 
00784    }
00785 
00786    //
00787    // The end cycle function has to be called here by hand:
00788    //
00789    cycle->EndCycle();
00790 
00791    // The cycle processing is done at this point:
00792    timer.Stop();
00793 
00794    m_logger << INFO << "Overall cycle statistics:" << SLogger::endmsg;
00795    m_logger.setf( std::ios::fixed );
00796    m_logger << INFO << std::setw( 10 ) << std::setfill( ' ' ) << std::setprecision( 0 )
00797             << procev << " Events - Real time " << std::setw( 6 ) << std::setprecision( 2 )
00798             << timer.RealTime() << " s  - " << std::setw( 5 )
00799             << std::setprecision( 0 ) << ( procev / timer.RealTime() ) << " Hz | CPU time "
00800             << std::setw( 6 ) << std::setprecision( 2 ) << timer.CpuTime() << " s  - "
00801             << std::setw( 5 ) << std::setprecision( 0 ) << ( procev / timer.CpuTime() )
00802             << " Hz" << SLogger::endmsg;
00803 
00804    ++m_curCycle;
00805    return;
00806 }
00807 
00816 void SCycleController::AddAnalysisCycle( ISCycleBase* cycleAlg ) {
00817 
00818    m_analysisCycles.push_back( cycleAlg );
00819    return;
00820 }
00821 
00825 void SCycleController::DeleteAllAnalysisCycles() {
00826 
00827    m_logger << INFO << "Deleting all analysis cycle algorithms from memory"
00828             << SLogger::endmsg;
00829 
00830    std::vector< ISCycleBase* >::const_iterator it = m_analysisCycles.begin();
00831    for( ; it != m_analysisCycles.end(); ++it ) {
00832       delete ( *it );
00833    }
00834 
00835    m_analysisCycles.clear();
00836 
00837    return;
00838 }
00839 
00840 void SCycleController::InitProof( const TString& server, const Int_t& nodes ) {
00841 
00842    //
00843    // Open the connection:
00844    //
00845    m_logger << INFO << "Opening PROOF connection to: " << server
00846             << SLogger::endmsg;
00847    m_proof = SProofManager::Instance()->Open( server );
00848    if( nodes > 0 ) m_proof->SetParallel( nodes );
00849 
00850    return;
00851 }
00852 
00853 void SCycleController::ShutDownProof() {
00854 
00855    //
00856    // Clean up the PROOF connection(s):
00857    //
00858    SProofManager::Instance()->Cleanup();
00859    m_proof = 0;
00860 
00861    return;
00862 }
00863 
00864 void SCycleController::WriteCycleOutput( TList* olist,
00865                                          const TString& filename,
00866                                          const TString& config,
00867                                          Bool_t update ) const {
00868 
00869    m_logger << INFO << "Writing output of \""
00870             << m_analysisCycles.at( m_curCycle )->GetName() << "\" to: "
00871             << filename << SLogger::endmsg;
00872 
00873    //
00874    // Open the output file:
00875    //
00876    TFile outputFile( filename , ( update ? "UPDATE" : "RECREATE" ) );
00877 
00878    //
00879    // List of files holding TTrees:
00880    //
00881    std::vector< TString > filesToMerge;
00882 
00883    //
00884    // Merge the memory objects into the output file:
00885    //
00886    for( Int_t i = 0; i < olist->GetSize(); ++i ) {
00887 
00888       outputFile.cd();
00889 
00890       if( dynamic_cast< SCycleOutput* >( olist->At( i ) ) ) {
00891          olist->At( i )->Write();
00892          m_logger << DEBUG << "Written object: " << olist->At( i )->GetName()
00893                   << SLogger::endmsg;
00894       } else if( dynamic_cast< TProofOutputFile* >( olist->At( i ) ) ) {
00895          TProofOutputFile* pfile = dynamic_cast< TProofOutputFile* >( olist->At( i ) );
00896          filesToMerge.push_back( pfile->GetOutputFileName() );
00897       } else if ( dynamic_cast< SOutputFile* >( olist->At( i ) ) ) {
00898          SOutputFile* sfile = dynamic_cast< SOutputFile* >( olist->At( i ) );
00899          filesToMerge.push_back( sfile->GetFileName() );
00900       } else {
00901          /*
00902          TDirectory* proofdir = outputFile.GetDirectory( "PROOF" );
00903          if( ! proofdir ) {
00904             proofdir = outputFile.mkdir( "PROOF", "PROOF related objects" );
00905          }
00906          proofdir->cd();
00907          olist->At( i )->Write();
00908          */
00909       }
00910    }
00911 
00912    //
00913    // Add the cycle configuration as metadata to the output file:
00914    //
00915    if( ! update ) {
00916       // Make a directory for all SFrame related metadata. Might want to
00917       // add other metadata types as well to the output files later on.
00918       TDirectory* sframeDir = outputFile.GetDirectory( "SFrame" );
00919       if( ! sframeDir ) {
00920          sframeDir = outputFile.mkdir( "SFrame" );
00921       }
00922       sframeDir->cd();
00923 
00924       // Create a TObjString out of the cycle configuration, and write it
00925       // out:
00926       TObjString configString( config );
00927       configString.Write( "CycleConfiguration" );
00928    }
00929 
00930    //
00931    // Write and close the output file:
00932    //
00933    outputFile.Write();
00934    outputFile.Close();
00935 
00936    //
00937    // Merge the TTree contents of the temporary files into our output file:
00938    //
00939    if( filesToMerge.size() ) {
00940 
00941       m_logger << DEBUG << "Merging disk-resident TTrees into \""
00942                << filename << "\"" << SLogger::endmsg;
00943 
00944       // Merge the file(s) into the output file using SFileMerger:
00945       SFileMerger merger;
00946       for( std::vector< TString >::const_iterator mfile = filesToMerge.begin();
00947            mfile != filesToMerge.end(); ++mfile ) {
00948          if( ! merger.AddFile( *mfile ) ) {
00949             REPORT_ERROR( "Failed to add file \"" << *mfile
00950                           << "\" to the merger" );
00951             continue;
00952          }
00953       }
00954       if( ! merger.OutputFile( filename, "UPDATE" ) ) {
00955          REPORT_ERROR( "Failed to specify \"" << filename << "\" as the output "
00956                        << "file name for the merging" );
00957       } else {
00958          if( ! merger.Merge() ) {
00959             REPORT_ERROR( "Failed to execute the file merging" );
00960          }
00961       }
00962 
00963       // Remove the temporary files:
00964       for( std::vector< TString >::const_iterator mfile = filesToMerge.begin();
00965            mfile != filesToMerge.end(); ++mfile ) {
00966          gSystem->Unlink( *mfile );
00967          // This is not too nice, but for LOCAL running we also have to remove
00968          // the temporary directory that the file was in:
00969          if( mfile->Contains( SFrame::ProofOutputFileName ) ) {
00970             TString dirname = gSystem->DirName( *mfile );
00971             if( dirname != "." ) {
00972                gSystem->Unlink( dirname );
00973             }
00974          }
00975       }
00976    }
00977 
00978    return;
00979 }