SFrame 3.6
core/src/SProofManager.cxx
Go to the documentation of this file.
00001 // $Id: SProofManager.cxx 318 2012-06-22 16:32:42Z krasznaa $
00002 /***************************************************************************
00003  * @Project: SFrame - ROOT-based analysis framework for ATLAS
00004  * @Package: Core
00005  *
00006  * @author Stefan Ask       <Stefan.Ask@cern.ch>           - Manchester
00007  * @author David Berge      <David.Berge@cern.ch>          - CERN
00008  * @author Johannes Haller  <Johannes.Haller@cern.ch>      - Hamburg
00009  * @author A. Krasznahorkay <Attila.Krasznahorkay@cern.ch> - CERN/Debrecen
00010  *
00011  ***************************************************************************/
00012 
00013 // ROOT include(s):
00014 #include <TProof.h>
00015 #include <TProofMgr.h>
00016 #include <TProofLog.h>
00017 #include <TMacro.h>
00018 #include <TList.h>
00019 #include <TObjArray.h>
00020 #include <TObjString.h>
00021 
00022 // Local include(s):
00023 #include "../include/SProofManager.h"
00024 
00025 // Initialize the static variable:
00026 SProofManager* SProofManager::m_instance = 0;
00027 
00032 SProofManager::~SProofManager() {
00033 
00034    Cleanup();
00035 }
00036 
00042 SProofManager* SProofManager::Instance() {
00043 
00044    if( ! m_instance ) {
00045       m_instance = new SProofManager();
00046    }
00047 
00048    return m_instance;
00049 }
00050 
00064 TProof* SProofManager::Open( const TString& url,
00065                              const TString& param ) throw( SError ) {
00066 
00067    // Copy the contents of the parameters, as we may have to change them:
00068    TString urlcopy( url );
00069    TString paramcopy( param );
00070 
00071    // The user may specify some special additional tags in the URL string,
00072    // which have to be searched for here:
00073    TObjArray* tokens = url.Tokenize( ";" );
00074    if( tokens->GetEntries() > 1 ) {
00075       TObjString* token = dynamic_cast< TObjString* >( tokens->At( 0 ) );
00076       if( ! token ) {
00077          REPORT_ERROR( "The tokenized array contains something that's not a "
00078                        "TObjString!" );
00079          delete tokens;
00080          throw SError( "Problems with tokenizing PROOF URL",
00081                        SError::SkipCycle );
00082       }
00083       urlcopy = token->GetString();
00084    }
00085    // We override the parameters given to the function if extra tokens are
00086    // found. Then again, the SFrame code never gives anything as "param" to the
00087    // function anyway. (May change at one point.)
00088    if( ( tokens->GetEntries() > 1 ) && ( paramcopy != "" ) ) {
00089       m_logger << WARNING << "Extra parameters provided both in the URL and "
00090                << "the extra parameters field. Using the one(s) from the "
00091                << "URL field." << SLogger::endmsg;
00092       paramcopy = "";
00093    }
00094    for( Int_t i = 1; i < tokens->GetEntries(); ++i ) {
00095       TObjString* tokenobj = dynamic_cast< TObjString* >( tokens->At( i ) );
00096       if( ! tokenobj ) {
00097          REPORT_ERROR( "The tokenized array contains something that's not a "
00098                        "TObjString!" );
00099          delete tokens;
00100          throw SError( "Problems with tokenizing PROOF URL",
00101                        SError::SkipCycle );
00102       }
00103       const TString token = tokenobj->GetString();
00104       // Did the user ask for the memory-leak profiling of the PROOF master?
00105       if( token == "MemProfMaster" ) {
00106          m_logger << INFO << "Running memory profiling on the master node"
00107                   << SLogger::endmsg;
00108          paramcopy = "valgrind=master";
00109          TProof::AddEnvVar( "PROOF_MASTER_WRAPPERCMD",
00110                             "valgrind_opts:--leak-check=full "
00111                             "--track-origins=yes --num-callers=32" );
00112       }
00113       // Did the user ask for the memory-leak profiling of the PROOF workers?
00114       else if( token == "MemProfWorkers" ) {
00115          m_logger << INFO << "Running memory profiling on the worker nodes"
00116                   << SLogger::endmsg;
00117          paramcopy = "valgrind=workers";
00118          TProof::AddEnvVar( "PROOF_SLAVE_WRAPPERCMD",
00119                             "valgrind_opts:--leak-check=full "
00120                             "--track-origins=yes --num-callers=32" );
00121       } else {
00122          REPORT_ERROR( "Unknown extra parameter specified: " << token );
00123       }
00124       // Extend the memory available to the PROOF processes in all memory
00125       // profiling jobs:
00126       if( token.BeginsWith( "MemProf" ) ) {
00127          // This should make sure that at least 10 GBs are available to the
00128          // process:
00129          TProof::AddEnvVar( "PROOF_RESMEMMAX",  "10000" );
00130          TProof::AddEnvVar( "PROOF_VIRTMEMMAX", "10000" );
00131       }
00132    }
00133    REPORT_VERBOSE( "Using URL: " << urlcopy << ", Param: " << paramcopy );
00134    // Clean up:
00135    delete tokens;
00136 
00137    // Check if the connection has already been opened. Notice that we're
00138    // using the original URL and parameters here.
00139    const ConnMap_t::key_type connection = std::make_pair( url, param );
00140    ConnMap_t::const_iterator conn;
00141    if( ( conn = m_connections.find( connection ) ) != m_connections.end() ) {
00142       m_logger << DEBUG << "Connection to \"" << url << "\" is already open"
00143                << SLogger::endmsg;
00144       return conn->second.first;
00145    }
00146 
00147    // Try to open the connection:
00148    TProof* server = TProof::Open( urlcopy, paramcopy );
00149    if( ! server ) {
00150       REPORT_ERROR( "Couldn't open connection to: " << url );
00151       throw SError( "Couldn't open connection to: " + url,
00152                     SError::SkipCycle );
00153    } else {
00154       m_logger << INFO << "Connection opened to \"" << url << "\""
00155                << SLogger::endmsg;
00156    }
00157 
00158    // Remember that the server is connected, but not initialized yet:
00159    m_connections[ connection ] = std::make_pair( server, kFALSE );
00160 
00161    return server;
00162 }
00163 
00174 Bool_t SProofManager::IsConfigured( const TString& url,
00175                                     const TString& param ) const {
00176    
00177    // Check if the connection has already been opened:
00178    ConnMap_t::key_type connection = std::make_pair( url, param );
00179    ConnMap_t::const_iterator conn = m_connections.find( connection );
00180    if( conn != m_connections.end() ) {
00181       // Return the configuration state:
00182       return conn->second.second;
00183    }
00184    
00185    // If the server is not even connected, then it is definitely not configured:
00186    REPORT_ERROR( "Asking about a server that's not yet connected (\""
00187                  << url << "\", \"" << param << "\")" );
00188    return kFALSE;
00189 }
00190 
00201 void SProofManager::SetConfigured( const TString& url,
00202                                    const TString& param,
00203                                    Bool_t state ) throw( SError ) {
00204 
00205    // Make sure the connection is open. This call can throw an error
00206    // if unsuccessful, so no point in checking its return value.
00207    Open( url, param );
00208 
00209    // Now find it in our internal cache:
00210    ConnMap_t::key_type connection = std::make_pair( url, param );
00211    ConnMap_t::iterator conn = m_connections.find( connection );
00212    if( conn == m_connections.end() ) {
00213       REPORT_FATAL( "Internal logic error discovered" );
00214       throw SError( "Internal logic error discovered",
00215                     SError::StopExecution );
00216    }
00217 
00218    // Update the state:
00219    conn->second.second = state;
00220 
00221    return;
00222 }
00223 
00233 void SProofManager::Cleanup() {
00234 
00235    PrintWorkerLogs();
00236 
00237    if( m_connections.size() ) {
00238       TProofMgr* mgr = m_connections.begin()->second.first->GetManager();
00239       for( ConnMap_t::iterator server = m_connections.begin();
00240            server != m_connections.end(); ++server ) {
00241          delete server->second.first;
00242       }
00243       delete mgr;
00244    }
00245    m_connections.clear();
00246 
00247    return;
00248 }
00249 
00254 SProofManager::SProofManager()
00255    : m_connections(), m_logger( "SProofManager" ) {
00256 
00257 }
00258 
00264 void SProofManager::PrintWorkerLogs() const {
00265 
00266    //
00267    // Loop over all the connections:
00268    //
00269    for( ConnMap_t::const_iterator server = m_connections.begin();
00270         server != m_connections.end(); ++server ) {
00271 
00272       //
00273       // Message identifying the server:
00274       //
00275       m_logger << INFO
00276                << "***************************************************************"
00277                << SLogger::endmsg;
00278       m_logger << INFO << "*" << SLogger::endmsg;
00279       m_logger << INFO << "* Printing all worker logs from server:"
00280                << SLogger::endmsg;
00281       m_logger << INFO << "*     " << server->first.first << SLogger::endmsg;
00282       m_logger << INFO << "*" << SLogger::endmsg;
00283       m_logger << INFO
00284                << "***************************************************************"
00285                << SLogger::endmsg;
00286 
00287       //
00288       // Get info about the slaves:
00289       //
00290       TList* slaveInfos = server->second.first->GetListOfSlaveInfos();
00291 
00292       //
00293       // Retrieve all logs:
00294       //
00295       TProofLog* log = server->second.first->GetManager()->GetSessionLogs();
00296       TList* logList = log->GetListOfLogs();
00297       for( Int_t i = 0; i < logList->GetSize(); ++i ) {
00298 
00299          //
00300          // Access the log of a single node:
00301          //
00302          TProofLogElem* element = dynamic_cast< TProofLogElem* >( logList->At( i ) );
00303          if( ! element ) {
00304             REPORT_ERROR( "Log element not recognised!" );
00305             continue;
00306          }
00307 
00308          //
00309          // Find "the name" of the node. TProofLogElem objects only know that they
00310          // came from node "0.2" for instance. This small loop matches these
00311          // identifiers to the proper node names in the slaveInfos list.
00312          //
00313          // If the identifier is not found in the list, then it has to be the master:
00314          TString nodeName = server->second.first->GetMaster();
00315          for( Int_t i = 0; i < slaveInfos->GetSize(); ++i ) {
00316 
00317             // Access the TSlaveInfo object:
00318             TSlaveInfo* info = dynamic_cast< TSlaveInfo* >( slaveInfos->At( i ) );
00319             if( ! info ) {
00320                REPORT_ERROR( "Couldn't use a TSlaveInfo object!" );
00321                continue;
00322             }
00323             // Check if this TSlaveInfo describes the source of the log:
00324             if( ! strcmp( element->GetName(), info->GetOrdinal() ) ) {
00325                nodeName = info->GetName();
00326                break;
00327             }
00328          }
00329 
00330          //
00331          // Print the log. Note that we don't need to redirect the log lines
00332          // to m_logger. The log lines of the nodes will already be formatted, so
00333          // printing them through SLogger would just look ugly.
00334          //
00335          m_logger << INFO << "=================================================="
00336                   << SLogger::endmsg;
00337          m_logger << INFO << "Output from node: " << nodeName << " ("
00338                   << element->GetName() << ")" << SLogger::endmsg;
00339 
00340          element->GetMacro()->Print();
00341 
00342          m_logger << INFO << "=================================================="
00343                   << SLogger::endmsg;
00344 
00345       }
00346 
00347       // It's up to us to delete the TProofLog object:
00348       delete log;
00349    }
00350 
00351    return;
00352 }