SFrame 3.6
|
00001 // $Id: SProofManager.cxx 318 2012-06-22 16:32:42Z krasznaa $ 00002 /*************************************************************************** 00003 * @Project: SFrame - ROOT-based analysis framework for ATLAS 00004 * @Package: Core 00005 * 00006 * @author Stefan Ask <Stefan.Ask@cern.ch> - Manchester 00007 * @author David Berge <David.Berge@cern.ch> - CERN 00008 * @author Johannes Haller <Johannes.Haller@cern.ch> - Hamburg 00009 * @author A. Krasznahorkay <Attila.Krasznahorkay@cern.ch> - CERN/Debrecen 00010 * 00011 ***************************************************************************/ 00012 00013 // ROOT include(s): 00014 #include <TProof.h> 00015 #include <TProofMgr.h> 00016 #include <TProofLog.h> 00017 #include <TMacro.h> 00018 #include <TList.h> 00019 #include <TObjArray.h> 00020 #include <TObjString.h> 00021 00022 // Local include(s): 00023 #include "../include/SProofManager.h" 00024 00025 // Initialize the static variable: 00026 SProofManager* SProofManager::m_instance = 0; 00027 00032 SProofManager::~SProofManager() { 00033 00034 Cleanup(); 00035 } 00036 00042 SProofManager* SProofManager::Instance() { 00043 00044 if( ! m_instance ) { 00045 m_instance = new SProofManager(); 00046 } 00047 00048 return m_instance; 00049 } 00050 00064 TProof* SProofManager::Open( const TString& url, 00065 const TString& param ) throw( SError ) { 00066 00067 // Copy the contents of the parameters, as we may have to change them: 00068 TString urlcopy( url ); 00069 TString paramcopy( param ); 00070 00071 // The user may specify some special additional tags in the URL string, 00072 // which have to be searched for here: 00073 TObjArray* tokens = url.Tokenize( ";" ); 00074 if( tokens->GetEntries() > 1 ) { 00075 TObjString* token = dynamic_cast< TObjString* >( tokens->At( 0 ) ); 00076 if( ! token ) { 00077 REPORT_ERROR( "The tokenized array contains something that's not a " 00078 "TObjString!" ); 00079 delete tokens; 00080 throw SError( "Problems with tokenizing PROOF URL", 00081 SError::SkipCycle ); 00082 } 00083 urlcopy = token->GetString(); 00084 } 00085 // We override the parameters given to the function if extra tokens are 00086 // found. Then again, the SFrame code never gives anything as "param" to the 00087 // function anyway. (May change at one point.) 00088 if( ( tokens->GetEntries() > 1 ) && ( paramcopy != "" ) ) { 00089 m_logger << WARNING << "Extra parameters provided both in the URL and " 00090 << "the extra parameters field. Using the one(s) from the " 00091 << "URL field." << SLogger::endmsg; 00092 paramcopy = ""; 00093 } 00094 for( Int_t i = 1; i < tokens->GetEntries(); ++i ) { 00095 TObjString* tokenobj = dynamic_cast< TObjString* >( tokens->At( i ) ); 00096 if( ! tokenobj ) { 00097 REPORT_ERROR( "The tokenized array contains something that's not a " 00098 "TObjString!" ); 00099 delete tokens; 00100 throw SError( "Problems with tokenizing PROOF URL", 00101 SError::SkipCycle ); 00102 } 00103 const TString token = tokenobj->GetString(); 00104 // Did the user ask for the memory-leak profiling of the PROOF master? 00105 if( token == "MemProfMaster" ) { 00106 m_logger << INFO << "Running memory profiling on the master node" 00107 << SLogger::endmsg; 00108 paramcopy = "valgrind=master"; 00109 TProof::AddEnvVar( "PROOF_MASTER_WRAPPERCMD", 00110 "valgrind_opts:--leak-check=full " 00111 "--track-origins=yes --num-callers=32" ); 00112 } 00113 // Did the user ask for the memory-leak profiling of the PROOF workers? 00114 else if( token == "MemProfWorkers" ) { 00115 m_logger << INFO << "Running memory profiling on the worker nodes" 00116 << SLogger::endmsg; 00117 paramcopy = "valgrind=workers"; 00118 TProof::AddEnvVar( "PROOF_SLAVE_WRAPPERCMD", 00119 "valgrind_opts:--leak-check=full " 00120 "--track-origins=yes --num-callers=32" ); 00121 } else { 00122 REPORT_ERROR( "Unknown extra parameter specified: " << token ); 00123 } 00124 // Extend the memory available to the PROOF processes in all memory 00125 // profiling jobs: 00126 if( token.BeginsWith( "MemProf" ) ) { 00127 // This should make sure that at least 10 GBs are available to the 00128 // process: 00129 TProof::AddEnvVar( "PROOF_RESMEMMAX", "10000" ); 00130 TProof::AddEnvVar( "PROOF_VIRTMEMMAX", "10000" ); 00131 } 00132 } 00133 REPORT_VERBOSE( "Using URL: " << urlcopy << ", Param: " << paramcopy ); 00134 // Clean up: 00135 delete tokens; 00136 00137 // Check if the connection has already been opened. Notice that we're 00138 // using the original URL and parameters here. 00139 const ConnMap_t::key_type connection = std::make_pair( url, param ); 00140 ConnMap_t::const_iterator conn; 00141 if( ( conn = m_connections.find( connection ) ) != m_connections.end() ) { 00142 m_logger << DEBUG << "Connection to \"" << url << "\" is already open" 00143 << SLogger::endmsg; 00144 return conn->second.first; 00145 } 00146 00147 // Try to open the connection: 00148 TProof* server = TProof::Open( urlcopy, paramcopy ); 00149 if( ! server ) { 00150 REPORT_ERROR( "Couldn't open connection to: " << url ); 00151 throw SError( "Couldn't open connection to: " + url, 00152 SError::SkipCycle ); 00153 } else { 00154 m_logger << INFO << "Connection opened to \"" << url << "\"" 00155 << SLogger::endmsg; 00156 } 00157 00158 // Remember that the server is connected, but not initialized yet: 00159 m_connections[ connection ] = std::make_pair( server, kFALSE ); 00160 00161 return server; 00162 } 00163 00174 Bool_t SProofManager::IsConfigured( const TString& url, 00175 const TString& param ) const { 00176 00177 // Check if the connection has already been opened: 00178 ConnMap_t::key_type connection = std::make_pair( url, param ); 00179 ConnMap_t::const_iterator conn = m_connections.find( connection ); 00180 if( conn != m_connections.end() ) { 00181 // Return the configuration state: 00182 return conn->second.second; 00183 } 00184 00185 // If the server is not even connected, then it is definitely not configured: 00186 REPORT_ERROR( "Asking about a server that's not yet connected (\"" 00187 << url << "\", \"" << param << "\")" ); 00188 return kFALSE; 00189 } 00190 00201 void SProofManager::SetConfigured( const TString& url, 00202 const TString& param, 00203 Bool_t state ) throw( SError ) { 00204 00205 // Make sure the connection is open. This call can throw an error 00206 // if unsuccessful, so no point in checking its return value. 00207 Open( url, param ); 00208 00209 // Now find it in our internal cache: 00210 ConnMap_t::key_type connection = std::make_pair( url, param ); 00211 ConnMap_t::iterator conn = m_connections.find( connection ); 00212 if( conn == m_connections.end() ) { 00213 REPORT_FATAL( "Internal logic error discovered" ); 00214 throw SError( "Internal logic error discovered", 00215 SError::StopExecution ); 00216 } 00217 00218 // Update the state: 00219 conn->second.second = state; 00220 00221 return; 00222 } 00223 00233 void SProofManager::Cleanup() { 00234 00235 PrintWorkerLogs(); 00236 00237 if( m_connections.size() ) { 00238 TProofMgr* mgr = m_connections.begin()->second.first->GetManager(); 00239 for( ConnMap_t::iterator server = m_connections.begin(); 00240 server != m_connections.end(); ++server ) { 00241 delete server->second.first; 00242 } 00243 delete mgr; 00244 } 00245 m_connections.clear(); 00246 00247 return; 00248 } 00249 00254 SProofManager::SProofManager() 00255 : m_connections(), m_logger( "SProofManager" ) { 00256 00257 } 00258 00264 void SProofManager::PrintWorkerLogs() const { 00265 00266 // 00267 // Loop over all the connections: 00268 // 00269 for( ConnMap_t::const_iterator server = m_connections.begin(); 00270 server != m_connections.end(); ++server ) { 00271 00272 // 00273 // Message identifying the server: 00274 // 00275 m_logger << INFO 00276 << "***************************************************************" 00277 << SLogger::endmsg; 00278 m_logger << INFO << "*" << SLogger::endmsg; 00279 m_logger << INFO << "* Printing all worker logs from server:" 00280 << SLogger::endmsg; 00281 m_logger << INFO << "* " << server->first.first << SLogger::endmsg; 00282 m_logger << INFO << "*" << SLogger::endmsg; 00283 m_logger << INFO 00284 << "***************************************************************" 00285 << SLogger::endmsg; 00286 00287 // 00288 // Get info about the slaves: 00289 // 00290 TList* slaveInfos = server->second.first->GetListOfSlaveInfos(); 00291 00292 // 00293 // Retrieve all logs: 00294 // 00295 TProofLog* log = server->second.first->GetManager()->GetSessionLogs(); 00296 TList* logList = log->GetListOfLogs(); 00297 for( Int_t i = 0; i < logList->GetSize(); ++i ) { 00298 00299 // 00300 // Access the log of a single node: 00301 // 00302 TProofLogElem* element = dynamic_cast< TProofLogElem* >( logList->At( i ) ); 00303 if( ! element ) { 00304 REPORT_ERROR( "Log element not recognised!" ); 00305 continue; 00306 } 00307 00308 // 00309 // Find "the name" of the node. TProofLogElem objects only know that they 00310 // came from node "0.2" for instance. This small loop matches these 00311 // identifiers to the proper node names in the slaveInfos list. 00312 // 00313 // If the identifier is not found in the list, then it has to be the master: 00314 TString nodeName = server->second.first->GetMaster(); 00315 for( Int_t i = 0; i < slaveInfos->GetSize(); ++i ) { 00316 00317 // Access the TSlaveInfo object: 00318 TSlaveInfo* info = dynamic_cast< TSlaveInfo* >( slaveInfos->At( i ) ); 00319 if( ! info ) { 00320 REPORT_ERROR( "Couldn't use a TSlaveInfo object!" ); 00321 continue; 00322 } 00323 // Check if this TSlaveInfo describes the source of the log: 00324 if( ! strcmp( element->GetName(), info->GetOrdinal() ) ) { 00325 nodeName = info->GetName(); 00326 break; 00327 } 00328 } 00329 00330 // 00331 // Print the log. Note that we don't need to redirect the log lines 00332 // to m_logger. The log lines of the nodes will already be formatted, so 00333 // printing them through SLogger would just look ugly. 00334 // 00335 m_logger << INFO << "==================================================" 00336 << SLogger::endmsg; 00337 m_logger << INFO << "Output from node: " << nodeName << " (" 00338 << element->GetName() << ")" << SLogger::endmsg; 00339 00340 element->GetMacro()->Print(); 00341 00342 m_logger << INFO << "==================================================" 00343 << SLogger::endmsg; 00344 00345 } 00346 00347 // It's up to us to delete the TProofLog object: 00348 delete log; 00349 } 00350 00351 return; 00352 }