From da7c5738431bdf7af42a473112ec1450bd36dd07 Mon Sep 17 00:00:00 2001 From: P-A Loizeau <p.-a.loizeau@gsi.de> Date: Thu, 2 Jul 2020 16:10:29 +0200 Subject: [PATCH] MQ: in TsaMultiSampler source class, add optional support for emission of a missed TS indices list and EOF command + prepare histos support --- MQ/source/CMakeLists.txt | 2 + MQ/source/CbmMQTsaMultiSampler.cxx | 293 ++++++++++++++++++++++++++++- MQ/source/CbmMQTsaMultiSampler.h | 34 +++- MQ/source/runTsaMultiSampler.cxx | 13 ++ 4 files changed, 332 insertions(+), 10 deletions(-) diff --git a/MQ/source/CMakeLists.txt b/MQ/source/CMakeLists.txt index 63bbfa08ee..f095b40e90 100644 --- a/MQ/source/CMakeLists.txt +++ b/MQ/source/CMakeLists.txt @@ -4,6 +4,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQSampler.sh.in ${CMAKE_BINARY_D set(INCLUDE_DIRECTORIES ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_SOURCE_DIR}/MQ/base + ${CBMROOT_SOURCE_DIR}/fles/flestools ${CBMBASE_DIR} ${CBMDATA_DIR} ${CBMDATA_DIR}/base @@ -84,6 +85,7 @@ set(DEPENDENCIES ${BOOST_LIBS} fles_ipc CbmMQBase + CbmFlibFlesTools ) GENERATE_EXECUTABLE() diff --git a/MQ/source/CbmMQTsaMultiSampler.cxx b/MQ/source/CbmMQTsaMultiSampler.cxx index 68cbf3c5bd..a83b54a342 100644 --- a/MQ/source/CbmMQTsaMultiSampler.cxx +++ b/MQ/source/CbmMQTsaMultiSampler.cxx @@ -9,9 +9,12 @@ #include "CbmMQTsaMultiSampler.h" #include "CbmMQDefs.h" +//#include "CbmFlesCanvasTools.h" +#include "CbmFormatDecHexPrintout.h" #include "FairMQLogger.h" #include "FairMQProgOptions.h" // device->fConfig +//#include "RootSerializer.h" #include "TimesliceSubscriber.hpp" #include "TimesliceMultiSubscriber.hpp" @@ -50,7 +53,6 @@ CbmMQTsaMultiSampler::CbmMQTsaMultiSampler() , fHost("") , fPort(0) , fHighWaterMark(1) - , fTSNumber(0) , fTSCounter(0) , fMessageCounter(0) , fSource(nullptr) @@ -71,7 +73,16 @@ try fbNoSplitTs = fConfig->GetValue<bool>("no-split-ts"); fbSendTsPerSysId = fConfig->GetValue<bool>("send-ts-per-sysid"); fbSendTsPerChannel = fConfig->GetValue<bool>("send-ts-per-channel"); - + fsChannelNameMissedTs = fConfig->GetValue< std::string >( "ChNameMissTs" ); + fsChannelNameCommands = fConfig->GetValue< std::string >( "ChNameCmds" ); +/* + fuPublishFreqTs = fConfig->GetValue< uint32_t >( "PubFreqTs" ); + fdMinPublishTime = fConfig->GetValue< double_t >( "PubTimeMin" ); + fdMaxPublishTime = fConfig->GetValue< double_t >( "PubTimeMax" ); + fsChannelNameHistosInput = fConfig->GetValue< std::string >( "ChNameIn" ); + fsChannelNameHistosConfig = fConfig->GetValue< std::string >( "ChNameHistCfg" ); + fsChannelNameCanvasConfig = fConfig->GetValue< std::string >( "ChNameCanvCfg" ); +*/ if( fbNoSplitTs ) { if( fbSendTsPerSysId ) @@ -96,6 +107,7 @@ try << " second one will be ignored!!!!"; } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs ) + /// Extract SysId and channel information if provided in the binary options std::vector< std::string > vSysIdChanPairs = fConfig->GetValue< std::vector< std::string > >("sysid-chan"); for( uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair ) { @@ -180,6 +192,12 @@ try int noChannel = fChannels.size(); LOG(info) << "Number of defined output channels: " << noChannel; for(auto const &entry : fChannels) { + /// Catches and ignores the channels for missing TS indices and commands + if( entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands ) + { + continue; + } // if( entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands ) + LOG(info) << "Channel name: " << entry.first; if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); } @@ -232,7 +250,75 @@ try { LOG(info) << "Sending components in separate TS per channel"; } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs ) +/* + LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs; + LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime; + LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime; +*/ + +/* + /// Histos creation and obtain pointer on them + /// Trigger histo creation on all associated algos + initOK &= CreateHistograms(); + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + { +// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() +// << " in " << vHistos[ uHisto ].second.data() +// ; + fArrayHisto.Add( vHistos[ uHisto ].first ); + std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(), + vHistos[ uHisto ].second ); + fvpsHistosFolder.push_back( psHistoConfig ); + + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist( NewMessage() ); + Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig ); + + /// Send message to the common histogram config messages queue + if( Send( messageHist, fsChannelNameHistosConfig ) < 0 ) + { + LOG(error) << "Problem sending histo config"; + return false; + } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 ) + + LOG(info) << "Config of hist " << psHistoConfig.first.data() + << " in folder " << psHistoConfig.second.data() ; + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + { +// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() +// << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[ uCanv ].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first ); + + std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf ); + + fvpsCanvasConfig.push_back( psCanvConfig ); + + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan( NewMessage() ); + Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig ); + + /// Send message to the common canvas config messages queue + if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 ) + { + LOG(error) << "Problem sending canvas config"; + return false; + } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 ) + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() + << " is " << psCanvConfig.second.data() ; + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) +*/ fTime = std::chrono::steady_clock::now(); } catch (InitTaskError& e) { @@ -243,6 +329,14 @@ try bool CbmMQTsaMultiSampler::IsChannelNameAllowed(std::string channelName) { + /// If sending full TS, accept any name! + if( fbNoSplitTs ) + { + fComponentsToSend[0]++; + fChannelsToSend[0].push_back(channelName); + return true; + } // if( fbNoSplitTs ) + bool bFoundMatch = false; // for(auto const &entry : fAllowedChannels) { for( uint32_t idx = 0; idx < fAllowedChannels.size(); ++idx ) @@ -287,13 +381,41 @@ bool CbmMQTsaMultiSampler::ConditionalRun() if (timeslice) { if (fTSCounter < fMaxTimeslices) { fTSCounter++; - if (fTSCounter % 10000 == 0) { - LOG(info) << "Analyse Event " << fTSCounter; - } const fles::Timeslice& ts = *timeslice; -// auto tsIndex = ts.index(); + uint64_t uTsIndex = ts.index(); + + /// Missed TS detection (only if output channel name defined by user) + if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && + ( 0 != fuPrevTsIndex && 0 != uTsIndex ) && + "" != fsChannelNameMissedTs ) + { + LOG(debug) << "Missed Timeslices. Old TS Index was " << fuPrevTsIndex + << " New TS Index is " << uTsIndex; + /// Add missing TS indices to a vector and send it in appropriate channel + std::vector< uint64_t > vulMissedIndices; + for( uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss ) + { + vulMissedIndices.emplace_back( ulMiss ); + } // for( uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss ) + if( !SendMissedTsIdx( vulMissedIndices ) ) + { + /// If command channel defined, send command to all "slaves" + if( "" != fsChannelNameCommands ) + { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for( std::chrono::milliseconds( 1000 ) ); + SendCommand( "STOP" ); + } // if( "" != fsChannelNameCommands ) + + return false; + } // if( !SendMissedTsIdx( vulMissedIndices ) ) + } // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && ( 0 != fuPrevTsIndex && 0 != uTsIndex ) && "" != fsChannelNameMissedTs ) + fuPrevTsIndex = uTsIndex; + if (fTSCounter % 10000 == 0) { + LOG(info) << "Received TS " << fTSCounter << " with index " << uTsIndex ; + } LOG(debug) << "Found " << ts.num_components() << " different components in timeslice"; @@ -306,39 +428,117 @@ bool CbmMQTsaMultiSampler::ConditionalRun() /// This is a special case for the TOF + T0 /// => Inefficient as copy the TS as many times as need! if( !CreateAndSendFullTs( ts ) ) + { + /// If command channel defined, send command to all "slaves" + if( "" != fsChannelNameCommands ) + { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for( std::chrono::milliseconds( 1000 ) ); + SendCommand( "STOP" ); + } // if( "" != fsChannelNameCommands ) + return false; + } // if( !CreateAndSendFullTs( ts ) ) } // if( fbNoSplitTs ) else if( fbSendTsPerSysId ) { /// This assumes that the order of the components does NOT change after the first TS /// That should be the case as the component index correspond to a physical link idx if( !CreateAndCombineComponentsPerSysId( ts ) ) + { + /// If command channel defined, send command to all "slaves" + if( "" != fsChannelNameCommands ) + { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for( std::chrono::milliseconds( 1000 ) ); + SendCommand( "STOP" ); + } // if( "" != fsChannelNameCommands ) + return false; + } // if( !CreateAndCombineComponentsPerSysId( ts ) ) } // else if( fbSendTsPerSysId ) of if( fbNoSplitTs ) else if( fbSendTsPerChannel ) { /// This assumes that the order of the components does NOT change after the first TS /// That should be the case as the component index correspond to a physical link idx if( !CreateAndCombineComponentsPerChannel( ts ) ) + { + /// If command channel defined, send command to all "slaves" + if( "" != fsChannelNameCommands ) + { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for( std::chrono::milliseconds( 1000 ) ); + SendCommand( "STOP" ); + } // if( "" != fsChannelNameCommands ) + return false; + } // if( !CreateAndCombineComponentsPerChannel( ts ) ) } // else if( fbSendTsPerChannel ) of if( fbSendTsPerSysId ) else { for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) { if( !CreateAndSendComponent(ts, nrComp) ) + { + /// If command channel defined, send command to all "slaves" + if( "" != fsChannelNameCommands ) + { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for( std::chrono::milliseconds( 1000 ) ); + SendCommand( "STOP" ); + } // if( "" != fsChannelNameCommands ) + return false; - } + } // if( !CreateAndSendComponent(ts, nrComp) ) + } // for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) } // else of if( fbSendTsPerSysId ) return true; } else { CalcRuntime(); + + /// If command channel defined, send command to all "slaves" + if( "" != fsChannelNameCommands ) + { + /// Wait 1 s before sending an EOF to let all slaves finish processing previous data + std::this_thread::sleep_for( std::chrono::seconds( 10 ) ); + std::string sCmd = "EOF "; + sCmd += FormatDecPrintout( fuPrevTsIndex ); + sCmd += " "; + sCmd += FormatDecPrintout( fTSCounter ); + SendCommand( sCmd ); + } // if( "" != fsChannelNameCommands ) + return false; } } else { CalcRuntime(); + + /// If command channel defined, send command to all "slaves" + if( "" != fsChannelNameCommands ) + { + /// Wait 1 s before sending an EOF to let all slaves finish processing previous data + std::this_thread::sleep_for( std::chrono::seconds( 10 ) ); + std::string sCmd = "EOF "; + sCmd += FormatDecPrintout( fuPrevTsIndex ); + sCmd += " "; + sCmd += FormatDecPrintout( fTSCounter ); + SendCommand( sCmd ); + } // if( "" != fsChannelNameCommands ) + return false; } - +/* + /// Send histograms each 100 time slices. Should be each ~1s + /// Use also runtime checker to trigger sending after M s if + /// processing too slow or delay sending if processing too fast + std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); + std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; + if( ( fdMaxPublishTime < elapsedSeconds.count() ) || + ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) + { + SendHistograms(); + fLastPublishTime = std::chrono::system_clock::now(); + } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) +*/ } bool CbmMQTsaMultiSampler::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp) @@ -652,7 +852,84 @@ bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component, st return true; } +bool CbmMQTsaMultiSampler::SendMissedTsIdx( std::vector< uint64_t > vIndices ) +{ + // serialize the vector and create the message + std::stringstream oss; + boost::archive::binary_oarchive oa(oss); + oa << vIndices; + std::string* strMsg = new std::string(oss.str()); + + FairMQMessagePtr msg( NewMessage( const_cast<char*>( strMsg->c_str() ), // data + strMsg->length(), // size + []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); }, + strMsg ) ); // object that manages the data + + // in case of error or transfer interruption, + // return false to go to IDLE state + // successfull transfer will return number of bytes + // transfered (can be 0 if sending an empty message). + LOG(debug) << "Send data to channel " << fsChannelNameMissedTs; + if( Send( msg, fsChannelNameMissedTs ) < 0 ) + { + LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameMissedTs; + return false; + } // if( Send( msg, fsChannelNameMissedTs ) < 0 ) + return true; +} +bool CbmMQTsaMultiSampler::SendCommand( std::string sCommand ) +{ + // serialize the vector and create the message + std::stringstream oss; + boost::archive::binary_oarchive oa(oss); + oa << sCommand; + std::string* strMsg = new std::string(oss.str()); + + FairMQMessagePtr msg( NewMessage( const_cast<char*>( strMsg->c_str() ), // data + strMsg->length(), // size + []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); }, + strMsg ) ); // object that manages the data + +// FairMQMessagePtr msg( NewMessage( const_cast<char*>( sCommand.c_str() ), // data +// sCommand.length(), // size +// []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); }, +// &sCommand ) ); // object that manages the data + + // in case of error or transfer interruption, + // return false to go to IDLE state + // successfull transfer will return number of bytes + // transfered (can be 0 if sending an empty message). + LOG(debug) << "Send data to channel " << fsChannelNameCommands; + if( Send( msg, fsChannelNameCommands ) < 0 ) + { + LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameCommands; + return false; + } // if( Send( msg, fsChannelNameMissedTs ) < 0 ) + + return true; +} + +/* +bool CbmMQTsaMultiSampler::SendHistograms() +{ + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr message( NewMessage() ); + Serialize<RootSerializer>( *message, &fArrayHisto ); + + /// Send message to the common histogram messages queue + if( Send( message, fsChannelNameHistosInput ) < 0 ) + { + LOG(error) << "Problem sending data"; + return false; + } // if( Send( message, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + ResetHistograms(); + + return true; +} +*/ CbmMQTsaMultiSampler::~ CbmMQTsaMultiSampler() { diff --git a/MQ/source/CbmMQTsaMultiSampler.h b/MQ/source/CbmMQTsaMultiSampler.h index 7cef56b6cf..bdb517b53d 100644 --- a/MQ/source/CbmMQTsaMultiSampler.h +++ b/MQ/source/CbmMQTsaMultiSampler.h @@ -41,8 +41,15 @@ class CbmMQTsaMultiSampler : public FairMQDevice bool fbNoSplitTs = false; bool fbSendTsPerSysId = false; bool fbSendTsPerChannel = false; - - uint64_t fTSNumber; +/* + std::string fsChannelNameHistosInput = "histogram-in"; + std::string fsChannelNameHistosConfig = "histo-conf"; + std::string fsChannelNameCanvasConfig = "canvas-conf"; + uint32_t fuPublishFreqTs = 100; + double_t fdMinPublishTime = 0.5; + double_t fdMaxPublishTime = 5; +*/ + uint64_t fuPrevTsIndex = 0; uint64_t fTSCounter; uint64_t fMessageCounter; @@ -65,6 +72,11 @@ class CbmMQTsaMultiSampler : public FairMQDevice bool CreateAndSendFullTs( const fles::Timeslice& ); bool SendData(const fles::StorableTimeslice&, int); bool SendData(const fles::StorableTimeslice&, std::string); + bool SendMissedTsIdx( std::vector< uint64_t > vIndices ); + bool SendCommand( std::string sCommand ); +/* + bool SendHistograms(); +*/ fles::TimesliceSource* fSource; //! std::chrono::steady_clock::time_point fTime; @@ -96,6 +108,24 @@ class CbmMQTsaMultiSampler : public FairMQDevice bool fbListCompPerChannelReady = false; std::vector< std::string > fvChannelsToSend= {}; std::vector< std::vector< uint32_t > > fvvCompPerChannel= {}; + + std::string fsChannelNameMissedTs = ""; + std::string fsChannelNameCommands = ""; +/* + /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) + std::vector< std::pair< TNamed *, std::string > > vHistos = fMonitorAlgo->GetHistoVector(); + /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) + std::vector< std::pair< TCanvas *, std::string > > vCanvases = fMonitorAlgo->GetCanvasVector(); + + /// Array of histograms to send to the histogram server + TObjArray fArrayHisto = {}; + /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server + std::vector< std::pair< std::string, std::string > > fvpsHistosFolder = {}; + /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server + /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" + /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" + std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig = {}; +*/ }; #endif /* CBMMQTSASAMPLER_H_ */ diff --git a/MQ/source/runTsaMultiSampler.cxx b/MQ/source/runTsaMultiSampler.cxx index caeacd6d53..dbb5941924 100644 --- a/MQ/source/runTsaMultiSampler.cxx +++ b/MQ/source/runTsaMultiSampler.cxx @@ -16,6 +16,19 @@ void addCustomOptions(bpo::options_description& options) ("send-ts-per-channel", bpo::value<bool>()->default_value(0), "Send a single TS per channel with all matching components") ("sysid-chan", bpo::value< std::vector< std::string > >(), "Pair a SysId in hex + channel name, separated by :, unique SysId!") ("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running"); + + options.add_options() ( "ChNameMissTs", bpo::value< std::string >()->default_value( "" ), + "MQ channel name for missed TS indices"); + options.add_options() ( "ChNameCmds", bpo::value< std::string >()->default_value( "" ), + "MQ channel name for commands to slaves"); +/* + options.add_options() ( "ChNameIn", bpo::value< std::string >()->default_value( "histogram-in" ), + "MQ channel name for histos"); + options.add_options() ( "ChNameHistCfg", bpo::value< std::string >()->default_value( "histo-conf" ), + "MQ channel name for histos config"); + options.add_options() ( "ChNameCanvCfg", bpo::value< std::string >()->default_value( "canvas-conf" ), + "MQ channel name for canvases config"); +*/ } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) -- GitLab