diff --git a/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx b/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx index a73da1f82248464ad72fd95c72a17ff4e66d55ef..6b4f0bc1ee2f1079341bd2a6758fc24398e4ae19 100644 --- a/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx +++ b/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx @@ -68,12 +68,10 @@ try { fvsSetTrigWin = fConfig->GetValue<std::vector<std::string>>("SetTrigWin"); fvsSetTrigMinNb = fConfig->GetValue<std::vector<std::string>>("SetTrigMinNb"); - fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); - fsChannelNameDataOutput = fConfig->GetValue<std::string>("EvtNameOut"); - fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); - fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg"); - fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg"); - fsAllowedChannels[0] = fsChannelNameDataInput; + fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); + fsChannelNameDataOutput = fConfig->GetValue<std::string>("EvtNameOut"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); + fsAllowedChannels[0] = fsChannelNameDataInput; fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); @@ -296,58 +294,12 @@ try { fpAlgo->SetTimeSliceMetaDataArray(fTimeSliceMetaDataArray); /// Now that everything is set, initialize the Algorithm - if (kFALSE == fpAlgo->InitAlgo()) { throw InitTaskError("Failed to initilize the algorithm class."); } + if (kFALSE == fpAlgo->InitAlgo()) { throw InitTaskError("Failed to initialize the algorithm class."); } /// Histograms management if (kTRUE == fbFillHistos) { - /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) - std::vector<std::pair<TNamed*, std::string>> vHistos = fpAlgo->GetHistoVector(); - /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) - std::vector<std::pair<TCanvas*, std::string>> vCanvases = fpAlgo->GetCanvasVector(); - - /// Add pointers to each histo in the histo array - /// Create histo config vector - /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > - /// and send it through a separate channel using the BoostSerializer - for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { - // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() - // << " in " << vHistos[ uHisto ].second.data() - // ; - fArrayHisto.Add(vHistos[uHisto].first); - std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); - fvpsHistosFolder.push_back(psHistoConfig); - - /// Serialize the vector of histo config into a single MQ message - FairMQMessagePtr messageHist(NewMessage()); - Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, psHistoConfig); - - /// Send message to the common histogram config messages queue - if (Send(messageHist, fsChannelNameHistosConfig) < 0) { throw InitTaskError("Problem sending histo config"); } - LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); - } - - /// Create canvas config vector - /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > - /// and send it through a separate channel using the BoostSerializer - for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { - // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() - // << " in " << vCanvases[ uCanv ].second.data(); - std::string sCanvName = (vCanvases[uCanv].first)->GetName(); - std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); - - std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); - - fvpsCanvasConfig.push_back(psCanvConfig); - - /// Serialize the vector of canvas config into a single MQ message - FairMQMessagePtr messageCan(NewMessage()); - Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, psCanvConfig); - - /// Send message to the common canvas config messages queue - if (Send(messageCan, fsChannelNameCanvasConfig) < 0) { throw InitTaskError("Problem sending canvas config"); } - - LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); - } + /// Comment to prevent clang format single lining + if (kFALSE == InitHistograms()) { throw InitTaskError("Failed to initialize the histograms."); } } } catch (InitTaskError& e) { @@ -374,6 +326,50 @@ bool CbmDeviceBuildDigiEvents::IsChannelNameAllowed(std::string channelName) return false; } + +bool CbmDeviceBuildDigiEvents::InitHistograms() +{ + bool initOK = true; + + /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) + std::vector<std::pair<TNamed*, std::string>> vHistos = fpAlgo->GetHistoVector(); + /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) + std::vector<std::pair<TCanvas*, std::string>> vCanvases = fpAlgo->GetCanvasVector(); + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { + // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() + // << " in " << vHistos[ uHisto ].second.data() + // ; + fArrayHisto.Add(vHistos[uHisto].first); + std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); + fvpsHistosFolder.push_back(psHistoConfig); + + LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { + // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() + // << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[uCanv].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); + + std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); + + fvpsCanvasConfig.push_back(psCanvConfig); + + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + + return initOK; +} + // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) bool CbmDeviceBuildDigiEvents::HandleData(FairMQParts& parts, int /*index*/) { @@ -488,9 +484,15 @@ bool CbmDeviceBuildDigiEvents::HandleData(FairMQParts& parts, int /*index*/) std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; if ((fdMaxPublishTime < elapsedSeconds.count()) || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { - SendHistograms(); + if (!fbConfigSent) { + // Send the configuration only once per run! + fbConfigSent = SendHistoConfAndData(); + } // if( !fbConfigSent ) + else + SendHistograms(); + fLastPublishTime = std::chrono::system_clock::now(); - } + } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) } return true; @@ -542,6 +544,50 @@ bool CbmDeviceBuildDigiEvents::SendEvents(FairMQParts& partsIn) return true; } +bool CbmDeviceBuildDigiEvents::SendHistoConfAndData() +{ + /// Prepare multiparts message and header + std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size()); + FairMQMessagePtr messageHeader(NewMessage()); + Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader); + + FairMQParts partsOut; + partsOut.AddPart(std::move(messageHeader)); + + for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) { + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]); + + partsOut.AddPart(std::move(messageHist)); + } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]); + + partsOut.AddPart(std::move(messageCan)); + } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr msgHistos(NewMessage()); + Serialize<RootSerializer>(*msgHistos, &fArrayHisto); + + partsOut.AddPart(std::move(msgHistos)); + + /// Send the multi-parts message to the common histogram messages queue + if (Send(partsOut, fsChannelNameHistosInput) < 0) { + LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data"; + return false; + } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + fpAlgo->ResetHistograms(kFALSE); + + return true; +} + bool CbmDeviceBuildDigiEvents::SendHistograms() { /// Serialize the array of histos into a single MQ message @@ -552,7 +598,7 @@ bool CbmDeviceBuildDigiEvents::SendHistograms() if (Send(message, fsChannelNameHistosInput) < 0) { LOG(error) << "Problem sending data"; return false; - } + } // if( Send( message, fsChannelNameHistosInput ) < 0 ) /// Reset the histograms after sending them (but do not reset the time) fpAlgo->ResetHistograms(kFALSE); diff --git a/MQ/mcbm/CbmDeviceBuildDigiEvents.h b/MQ/mcbm/CbmDeviceBuildDigiEvents.h index 4c911765f1ee11f403b26d43a8bbe488eb428324..3dc2afbfbfeb721cca705fa7b603abea1704e45a 100644 --- a/MQ/mcbm/CbmDeviceBuildDigiEvents.h +++ b/MQ/mcbm/CbmDeviceBuildDigiEvents.h @@ -64,12 +64,10 @@ private: std::vector<std::string> fvsSetTrigWin = {}; std::vector<std::string> fvsSetTrigMinNb = {}; /// message queues - std::string fsChannelNameDataInput = "unpts_0"; - std::string fsChannelNameDataOutput = "events"; - std::string fsChannelNameCommands = "commands"; - std::string fsChannelNameHistosInput = "histogram-in"; - std::string fsChannelNameHistosConfig = "histo-conf"; - std::string fsChannelNameCanvasConfig = "canvas-conf"; + std::string fsChannelNameDataInput = "unpts_0"; + std::string fsChannelNameDataOutput = "events"; + std::string fsChannelNameCommands = "commands"; + std::string fsChannelNameHistosInput = "histogram-in"; /// Histograms management uint32_t fuPublishFreqTs = 100; double_t fdMinPublishTime = 0.5; @@ -117,10 +115,14 @@ private: /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; + /// Flag indicating whether the histograms and canvases configurations were already published + bool fbConfigSent = false; bool IsChannelNameAllowed(std::string channelName); + bool InitHistograms(); void Finish(); bool SendEvents(FairMQParts& partsIn); + bool SendHistoConfAndData(); bool SendHistograms(); }; diff --git a/MQ/mcbm/CbmDeviceDigiEventSink.cxx b/MQ/mcbm/CbmDeviceDigiEventSink.cxx index 21b771f612a019600e309726c75fd0d1960196fc..744b73158c60f38251faca95ca26b0f9a56fe9e9 100644 --- a/MQ/mcbm/CbmDeviceDigiEventSink.cxx +++ b/MQ/mcbm/CbmDeviceDigiEventSink.cxx @@ -69,13 +69,11 @@ try { fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn"); fsAllowedChannels[0] = fsChannelNameDataInput; - fbFillHistos = fConfig->GetValue<bool>("FillHistos"); - fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); - fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg"); - fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg"); - fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); - fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); - fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + fbFillHistos = fConfig->GetValue<bool>("FillHistos"); + fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); + fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); + fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); /// Associate the MissedTs Channel to the corresponding handler OnData(fsChannelNameMissedTs, &CbmDeviceDigiEventSink::HandleMissTsData); @@ -165,68 +163,8 @@ try { /// Histograms management if (kTRUE == fbFillHistos) { - /* - /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) - std::vector< std::pair< TNamed *, std::string > > vHistos = fpAlgo->GetHistoVector(); - /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) - std::vector< std::pair< TCanvas *, std::string > > vCanvases = fpAlgo->GetCanvasVector(); - - /// Add pointers to each histo in the histo array - /// Create histo config vector - /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > - /// and send it through a separate channel using the BoostSerializer - for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) - { -// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() -// << " in " << vHistos[ uHisto ].second.data() -// ; - fArrayHisto.Add( vHistos[ uHisto ].first ); - std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(), - vHistos[ uHisto ].second ); - fvpsHistosFolder.push_back( psHistoConfig ); - - /// Serialize the vector of histo config into a single MQ message - FairMQMessagePtr messageHist( NewMessage() ); - Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig ); - - /// Send message to the common histogram config messages queue - if( Send( messageHist, fsChannelNameHistosConfig ) < 0 ) - { - throw InitTaskError( "Problem sending histo config" ); - } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 ) - - LOG(info) << "Config of hist " << psHistoConfig.first.data() - << " in folder " << psHistoConfig.second.data() ; - } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) - - /// Create canvas config vector - /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > - /// and send it through a separate channel using the BoostSerializer - for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) - { -// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() -// << " in " << vCanvases[ uCanv ].second.data(); - std::string sCanvName = (vCanvases[ uCanv ].first)->GetName(); - std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first ); - - std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf ); - - fvpsCanvasConfig.push_back( psCanvConfig ); - - /// Serialize the vector of canvas config into a single MQ message - FairMQMessagePtr messageCan( NewMessage() ); - Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig ); - - /// Send message to the common canvas config messages queue - if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 ) - { - throw InitTaskError( "Problem sending canvas config" ); - } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 ) - - LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() - << " is " << psCanvConfig.second.data() ; - } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) -*/ + /// Comment to prevent clang format single lining + if (kFALSE == InitHistograms()) { throw InitTaskError("Failed to initialize the histograms."); } } // if( kTRUE == fbFillHistos ) } catch (InitTaskError& e) { @@ -252,6 +190,55 @@ bool CbmDeviceDigiEventSink::IsChannelNameAllowed(std::string channelName) LOG(error) << "Stop device."; return false; } + +bool CbmDeviceDigiEventSink::InitHistograms() +{ + /// Histos creation and obtain pointer on them + /// Trigger histo creation, filling vHistos and vCanvases + // bool initOK =CreateHistograms(); + bool initOK = true; + + /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) + // ALGO: std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector(); + std::vector<std::pair<TNamed*, std::string>> vHistos = {}; + /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) + // ALGO: std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector(); + std::vector<std::pair<TCanvas*, std::string>> vCanvases = {}; + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { + // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() + // << " in " << vHistos[ uHisto ].second.data() + // ; + fArrayHisto.Add(vHistos[uHisto].first); + std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); + fvpsHistosFolder.push_back(psHistoConfig); + + LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { + // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() + // << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[uCanv].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); + + std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); + + fvpsCanvasConfig.push_back(psCanvConfig); + + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + + return initOK; +} + //--------------------------------------------------------------------// // handler is called whenever a message arrives on fsChannelNameMissedTs, with a reference to the message and a sub-channel index (here 0) bool CbmDeviceDigiEventSink::HandleMissTsData(FairMQMessagePtr& msg, int /*index*/) @@ -319,7 +306,13 @@ bool CbmDeviceDigiEventSink::HandleData(FairMQParts& parts, int /*index*/) std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; if ((fdMaxPublishTime < elapsedSeconds.count()) || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { - SendHistograms(); + if (!fbConfigSent) { + // Send the configuration only once per run! + fbConfigSent = SendHistoConfAndData(); + } // if( !fbConfigSent ) + else + SendHistograms(); + fLastPublishTime = std::chrono::system_clock::now(); } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) } // if( kTRUE == fbFillHistos ) @@ -451,6 +444,12 @@ void CbmDeviceDigiEventSink::CheckTsQueues() bHoleFoundInBothQueues = true; } // while( !bHoleFoundInBothQueues ) + LOG(debug) << "CbmDeviceDigiEventSink::CheckTsQueues => buffered TS " << fmFullTsStorage.size() + << " buffered empties " << fvulMissedTsIndices.size(); + for (auto it = fmFullTsStorage.begin(); it != fmFullTsStorage.end(); ++it) { + LOG(debug) << "CbmDeviceDigiEventSink::CheckTsQueues => buffered TS index " << (*it).first; + } + /// Delete the processed entries fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs); fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs); @@ -479,13 +478,13 @@ void CbmDeviceDigiEventSink::PrepareTreeEntry(CbmEventTimeslice unpTs) /// Full TS Digis storage (optional usage, controlled by fbStoreFullTs!) if (fbStoreFullTs) { - if( 0 < unpTs.fvDigiT0.size() ) fvDigiT0->assign( unpTs.fvDigiT0.begin(), unpTs.fvDigiT0.end() ); - if( 0 < unpTs.fvDigiSts.size() ) fvDigiSts->assign( unpTs.fvDigiSts.begin(), unpTs.fvDigiSts.end() ); - if( 0 < unpTs.fvDigiMuch.size() ) fvDigiMuch->assign( unpTs.fvDigiMuch.begin(), unpTs.fvDigiMuch.end() ); - if( 0 < unpTs.fvDigiTrd.size() ) fvDigiTrd->assign( unpTs.fvDigiTrd.begin(), unpTs.fvDigiTrd.end() ); - if( 0 < unpTs.fvDigiTof.size() ) fvDigiTof->assign( unpTs.fvDigiTof.begin(), unpTs.fvDigiTof.end() ); - if( 0 < unpTs.fvDigiRich.size() ) fvDigiRich->assign( unpTs.fvDigiRich.begin(), unpTs.fvDigiRich.end() ); - if( 0 < unpTs.fvDigiPsd.size() ) fvDigiPsd->assign( unpTs.fvDigiPsd.begin(), unpTs.fvDigiPsd.end() ); + if (0 < unpTs.fvDigiT0.size()) fvDigiT0->assign(unpTs.fvDigiT0.begin(), unpTs.fvDigiT0.end()); + if (0 < unpTs.fvDigiSts.size()) fvDigiSts->assign(unpTs.fvDigiSts.begin(), unpTs.fvDigiSts.end()); + if (0 < unpTs.fvDigiMuch.size()) fvDigiMuch->assign(unpTs.fvDigiMuch.begin(), unpTs.fvDigiMuch.end()); + if (0 < unpTs.fvDigiTrd.size()) fvDigiTrd->assign(unpTs.fvDigiTrd.begin(), unpTs.fvDigiTrd.end()); + if (0 < unpTs.fvDigiTof.size()) fvDigiTof->assign(unpTs.fvDigiTof.begin(), unpTs.fvDigiTof.end()); + if (0 < unpTs.fvDigiRich.size()) fvDigiRich->assign(unpTs.fvDigiRich.begin(), unpTs.fvDigiRich.end()); + if (0 < unpTs.fvDigiPsd.size()) fvDigiPsd->assign(unpTs.fvDigiPsd.begin(), unpTs.fvDigiPsd.end()); } } void CbmDeviceDigiEventSink::DumpTreeEntry() @@ -506,7 +505,7 @@ void CbmDeviceDigiEventSink::DumpTreeEntry() fpFairRootMgr->FillEventHeader(fEvtHeader); fpFairRootMgr->Fill(); fpFairRootMgr->DeleteOldWriteoutBufferData(); -// fpFairRootMgr->Write(); + // fpFairRootMgr->Write(); /// Clear metadata array fTimeSliceMetaDataArray->Clear(); @@ -526,6 +525,51 @@ void CbmDeviceDigiEventSink::DumpTreeEntry() } //--------------------------------------------------------------------// + +bool CbmDeviceDigiEventSink::SendHistoConfAndData() +{ + /// Prepare multiparts message and header + std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size()); + FairMQMessagePtr messageHeader(NewMessage()); + Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader); + + FairMQParts partsOut; + partsOut.AddPart(std::move(messageHeader)); + + for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) { + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]); + + partsOut.AddPart(std::move(messageHist)); + } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]); + + partsOut.AddPart(std::move(messageCan)); + } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr msgHistos(NewMessage()); + Serialize<RootSerializer>(*msgHistos, &fArrayHisto); + + partsOut.AddPart(std::move(msgHistos)); + + /// Send the multi-parts message to the common histogram messages queue + if (Send(partsOut, fsChannelNameHistosInput) < 0) { + LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data"; + return false; + } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + // ResetHistograms(kFALSE); + + return true; +} + bool CbmDeviceDigiEventSink::SendHistograms() { /// Serialize the array of histos into a single MQ message @@ -539,7 +583,7 @@ bool CbmDeviceDigiEventSink::SendHistograms() } // if( Send( message, fsChannelNameHistosInput ) < 0 ) /// Reset the histograms after sending them (but do not reset the time) - // fpAlgo->ResetHistograms( kFALSE ); + // ResetHistograms(kFALSE); return true; } @@ -567,6 +611,8 @@ void CbmDeviceDigiEventSink::Finish() fpFairRootMgr->CloseSink(); LOG(info) << "File closed after saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter << " full ones and " << fulMissedTsCounter << " missed/empty ones)"; + LOG(info) << "Still buffered TS " << fmFullTsStorage.size() << " and still buffered empties " + << fvulMissedTsIndices.size(); if (kTRUE == fbFillHistos) { SendHistograms(); diff --git a/MQ/mcbm/CbmDeviceDigiEventSink.h b/MQ/mcbm/CbmDeviceDigiEventSink.h index 3090e1b8fbcd2f627f0de959eb68f25d70f804dc..138e3d0c183b8703630d72e939cf7375a21b1cb8 100644 --- a/MQ/mcbm/CbmDeviceDigiEventSink.h +++ b/MQ/mcbm/CbmDeviceDigiEventSink.h @@ -90,12 +90,10 @@ private: /// Algo enum settings std::string fsOutputFileName = "mcbm_digis_events.root"; /// message queues - std::string fsChannelNameMissedTs = "missedts"; - std::string fsChannelNameDataInput = "events"; - std::string fsChannelNameCommands = "commands"; - std::string fsChannelNameHistosInput = "histogram-in"; - std::string fsChannelNameHistosConfig = "histo-conf"; - std::string fsChannelNameCanvasConfig = "canvas-conf"; + std::string fsChannelNameMissedTs = "missedts"; + std::string fsChannelNameDataInput = "events"; + std::string fsChannelNameCommands = "commands"; + std::string fsChannelNameHistosInput = "histogram-in"; /// Histograms management uint32_t fuPublishFreqTs = 100; double_t fdMinPublishTime = 0.5; @@ -155,13 +153,16 @@ private: /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; + /// Flag indicating whether the histograms and canvases configurations were already published + bool fbConfigSent = false; /// Internal methods bool IsChannelNameAllowed(std::string channelName); - // Bool_t InitContainers(); + bool InitHistograms(); void CheckTsQueues(); void PrepareTreeEntry(CbmEventTimeslice unpTs); void DumpTreeEntry(); + bool SendHistoConfAndData(); bool SendHistograms(); void Finish(); }; diff --git a/MQ/mcbm/CbmDeviceUnpack.cxx b/MQ/mcbm/CbmDeviceUnpack.cxx index cd895deaafa20ea1eba952e73cef0efef1b6ba98..eebb7eb279eda904e2ee6b86fd194cde580a70ad 100644 --- a/MQ/mcbm/CbmDeviceUnpack.cxx +++ b/MQ/mcbm/CbmDeviceUnpack.cxx @@ -60,36 +60,17 @@ void CbmDeviceUnpack::InitTask() try { /// Read options from executable LOG(info) << "Init options for CbmDeviceUnpack."; - fsSetupName = fConfig->GetValue<std::string>("Setup"); - fuRunId = fConfig->GetValue<uint32_t>("RunId"); - fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs"); - fbOutputFullTimeSorting = fConfig->GetValue<bool>("FullTimeSort"); - fvsSetTimeOffs = fConfig->GetValue<std::vector<std::string>>("SetTimeOffs"); - fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); - fsChannelNameDataOutput = fConfig->GetValue<std::string>("TsNameOut"); - /// TODO: option to set fuDigiMaskedIdT0 !!!! - fsAllowedChannels[0] = fsChannelNameDataInput; - - // Get the information about created channels from the device - // Check if the defined channels from the topology (by name) - // are in the list of channels which are possible/allowed - // for the device - // The idea is to check at initilization if the devices are - // properly connected. For the time beeing this is done with a - // nameing convention. It is not avoided that someone sends other - // data on this channel. - //logger::SetLogLevel("INFO"); - - int noChannel = fChannels.size(); - LOG(info) << "Number of defined channels: " << noChannel; - for (auto const& entry : fChannels) { - LOG(info) << "Channel name: " << entry.first; - if (std::string::npos != entry.first.find(fsChannelNameDataInput)) { - if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); - OnData(entry.first, &CbmDeviceUnpack::HandleData); - } // if( entry.first.find( "ts" ) - } // for( auto const &entry : fChannels ) - InitContainers(); + fsSetupName = fConfig->GetValue<std::string>("Setup"); + fuRunId = fConfig->GetValue<uint32_t>("RunId"); + fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs"); + fbOutputFullTimeSorting = fConfig->GetValue<bool>("FullTimeSort"); + fvsSetTimeOffs = fConfig->GetValue<std::vector<std::string>>("SetTimeOffs"); + fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); + fsChannelNameDataOutput = fConfig->GetValue<std::string>("TsNameOut"); + fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); + fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); + fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); } catch (InitTaskError& e) { LOG(error) << e.what(); @@ -97,26 +78,6 @@ catch (InitTaskError& e) { cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); } -bool CbmDeviceUnpack::IsChannelNameAllowed(std::string channelName) -{ - - for (auto const& entry : fsAllowedChannels) { - std::size_t pos1 = channelName.find(entry); - if (pos1 != std::string::npos) { - const vector<std::string>::const_iterator pos = - std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry); - const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin(); - LOG(info) << "Found " << entry << " in " << channelName; - LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx; - return true; - } // if (pos1!=std::string::npos) - } // for(auto const &entry : fsAllowedChannels) - LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names."; - LOG(error) << "Stop device."; - - return false; -} - Bool_t CbmDeviceUnpack::InitContainers() { LOG(info) << "Init parameter containers for CbmDeviceUnpack."; @@ -565,15 +526,95 @@ CbmDeviceUnpack::InitParameters(std::vector<std::pair<std::string, std::shared_p return kTRUE; } -// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) -bool CbmDeviceUnpack::HandleData(FairMQMessagePtr& msg, int /*index*/) +bool CbmDeviceUnpack::InitHistograms() { + /// Histos creation and obtain pointer on them + /// Trigger histo creation on all associated algos + // ALGO: bool initOK = fMonitorAlgo->CreateHistograms(); + bool initOK = true; + + /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) + // ALGO: std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector(); + std::vector<std::pair<TNamed*, std::string>> vHistos = {}; + /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) + // ALGO: std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector(); + std::vector<std::pair<TCanvas*, std::string>> vCanvases = {}; + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { + // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() + // << " in " << vHistos[ uHisto ].second.data() + // ; + fArrayHisto.Add(vHistos[uHisto].first); + std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); + fvpsHistosFolder.push_back(psHistoConfig); + + LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { + // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() + // << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[uCanv].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); + + std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); + + fvpsCanvasConfig.push_back(psCanvConfig); + + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + + return initOK; +} + +// Method called by run loop and requesting new data from the TS source whenever +bool CbmDeviceUnpack::ConditionalRun() +{ + /// First request a new TS (full one) + std::string message = "full"; + LOG(debug) << "Requesting new TS by sending message: full" << message; + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + if (Send(req, fsChannelNameDataInput) <= 0) { + LOG(error) << "Failed to send the request! message was " << message; + return false; + } // if (Send(req, fsChannelNameDataInput) <= 0) + else if (Receive(rep, fsChannelNameDataInput) < 0) { + LOG(error) << "Failed to receive a reply to the request! message was " << message; + return false; + } // else if (Receive(rep, fsChannelNameDataInput) < 0) + else if (rep->GetSize() == 0) { + LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message; + return false; + } // else if (rep->GetSize() == 0) + + /// Message received, do Algo related Initialization steps if needed + if (0 == fulNumMessages) { + try { + InitContainers(); + } + catch (InitTaskError& e) { + LOG(error) << e.what(); + ChangeState(fair::mq::Transition::ErrorFound); + } + } // if( 0 == fulNumMessages) + + if (0 == fulNumMessages) InitHistograms(); + fulNumMessages++; - LOG(debug) << "Received message number " << fulNumMessages << " with size " << msg->GetSize(); + LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize(); if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages"; - std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize()); + std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize()); std::istringstream iss(msgStr); boost::archive::binary_iarchive inputArchive(iss); @@ -603,8 +644,10 @@ bool CbmDeviceUnpack::HandleData(FairMQMessagePtr& msg, int /*index*/) /// Process the Timeslice DoUnpack(ts, 0); + LOG(debug) << "Unpack: Sending TS index " << ts.index(); /// Send digi vectors to ouput if (!SendUnpData()) return false; + LOG(debug) << "Unpack: Sent TS index " << ts.index(); // Reset the event header for a new timeslice fCbmTsEventHeader->Reset(); @@ -623,6 +666,23 @@ bool CbmDeviceUnpack::HandleData(FairMQMessagePtr& msg, int /*index*/) // ---- Psd ---- if (fPsdConfig) fPsdConfig->Reset(); + /// Send histograms each 100 time slices. Should be each ~1s + /// Use also runtime checker to trigger sending after M s if + /// processing too slow or delay sending if processing too fast + std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); + std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; + if ((fdMaxPublishTime < elapsedSeconds.count()) + || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { + if (!fbConfigSent) { + // Send the configuration only once per run! + fbConfigSent = SendHistoConfAndData(); + } // if( !fbConfigSent ) + else + SendHistograms(); + + fLastPublishTime = std::chrono::system_clock::now(); + } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) + return true; } @@ -748,6 +808,69 @@ bool CbmDeviceUnpack::SendUnpData() } +bool CbmDeviceUnpack::SendHistoConfAndData() +{ + /// Prepare multiparts message and header + std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size()); + FairMQMessagePtr messageHeader(NewMessage()); + Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader); + + FairMQParts partsOut; + partsOut.AddPart(std::move(messageHeader)); + + for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) { + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]); + + partsOut.AddPart(std::move(messageHist)); + } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]); + + partsOut.AddPart(std::move(messageCan)); + } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr msgHistos(NewMessage()); + Serialize<RootSerializer>(*msgHistos, &fArrayHisto); + + partsOut.AddPart(std::move(msgHistos)); + + /// Send the multi-parts message to the common histogram messages queue + if (Send(partsOut, fsChannelNameHistosInput) < 0) { + LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data"; + return false; + } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + // ALGO: fMonitorAlgo->ResetHistograms(kFALSE); + + return true; +} + +bool CbmDeviceUnpack::SendHistograms() +{ + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr message(NewMessage()); + Serialize<RootSerializer>(*message, &fArrayHisto); + + /// Send message to the common histogram messages queue + if (Send(message, fsChannelNameHistosInput) < 0) { + LOG(error) << "Problem sending data"; + return false; + } // if( Send( message, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + // ALGO: fMonitorAlgo->ResetHistograms(kFALSE); + + return true; +} + + CbmDeviceUnpack::~CbmDeviceUnpack() { if (fStsConfig) fStsConfig->GetUnpacker()->Finish(); @@ -769,7 +892,7 @@ Bool_t CbmDeviceUnpack::DoUnpack(const fles::Timeslice& ts, size_t /*component*/ uint64_t nComponents = ts.num_components(); // if (fDoDebugPrints) LOG(info) << "Unpack: TS index " << ts.index() << " components " << nComponents; - LOG(info) << "Unpack: TS index " << ts.index() << " components " << nComponents; + LOG(debug) << "Unpack: TS index " << ts.index() << " components " << nComponents; for (uint64_t component = 0; component < nComponents; component++) { auto systemId = static_cast<std::uint16_t>(ts.descriptor(component, 0).sys_id); diff --git a/MQ/mcbm/CbmDeviceUnpack.h b/MQ/mcbm/CbmDeviceUnpack.h index 1eb6ad1a001270f1109ab63211690330ad7b6ab9..b53bdd7d7e9ae02394a478e79bf18648eb105361 100644 --- a/MQ/mcbm/CbmDeviceUnpack.h +++ b/MQ/mcbm/CbmDeviceUnpack.h @@ -23,6 +23,7 @@ #include "Rtypes.h" #include "TObjArray.h" +#include <chrono> #include <map> #include <vector> @@ -45,7 +46,7 @@ public: protected: virtual void InitTask(); - bool HandleData(FairMQMessagePtr&, int); + bool ConditionalRun(); bool HandleCommand(FairMQMessagePtr&, int); /** @brief Set the Sts Unpack Config @param config */ @@ -100,22 +101,24 @@ private: /// User settings parameters std::string fsSetupName = "mcbm_beam_2021_07"; uint32_t fuRunId = 1588; - std::string fsChannelNameDataInput = "fullts"; - std::string fsChannelNameDataOutput = "unpts_0"; - std::string fsChannelNameCommands = "commands"; - UInt_t fuDigiMaskedIdT0 = 0x00005006; - UInt_t fuDigiMaskId = 0x0001FFFF; - - /// List of MQ channels names - std::vector<std::string> fsAllowedChannels = {fsChannelNameDataInput}; + /// message queues + std::string fsChannelNameDataInput = "ts-request"; + std::string fsChannelNameDataOutput = "unpts_0"; + std::string fsChannelNameCommands = "commands"; + std::string fsChannelNameHistosInput = "histogram-in"; + /// Histograms management + uint32_t fuPublishFreqTs = 100; + double_t fdMinPublishTime = 0.5; + double_t fdMaxPublishTime = 5.0; /// Parameters management // TList* fParCList = nullptr; Bool_t InitParameters(std::vector<std::pair<std::string, std::shared_ptr<FairParGenericSet>>>* reqparvec); /// Statistics & first TS rejection - uint64_t fulNumMessages = 0; - uint64_t fulTsCounter = 0; + uint64_t fulNumMessages = 0; + uint64_t fulTsCounter = 0; + std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); /** @brief Map to store a name for the unpackers and the processed amount of digis, key = fkFlesId*/ std::map<std::uint16_t, std::pair<std::string, size_t>> fNameMap = {}; //! /** @brief Map to store the cpu and wall time, key = fkFlesId*/ @@ -146,11 +149,24 @@ private: Double_t fdTsFullSizeInNs = -1.0; //! Total size of all MS in a TS, [nanoseconds] TimesliceMetaData* fTsMetaData; - bool IsChannelNameAllowed(std::string channelName); + /// Array of histograms to send to the histogram server + TObjArray fArrayHisto = {}; + /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server + std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {}; + /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server + /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" + /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" + std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; + /// Flag indicating whether the histograms and canvases configurations were already published + bool fbConfigSent = false; + Bool_t InitContainers(); + bool InitHistograms(); Bool_t DoUnpack(const fles::Timeslice& ts, size_t component); void Finish(); bool SendUnpData(); + bool SendHistoConfAndData(); + bool SendHistograms(); std::shared_ptr<CbmTrdSpadic> GetTrdSpadic(bool useAvgBaseline); diff --git a/MQ/mcbm/UnpBuildSink_missing_features.txt b/MQ/mcbm/UnpBuildSink_missing_features.txt index 70add5ccf54767ecbdf5c0d7a7eff74c9daf98a9..fd500ae3f14cbe7248c7efd27189659b6b4f6b2f 100644 --- a/MQ/mcbm/UnpBuildSink_missing_features.txt +++ b/MQ/mcbm/UnpBuildSink_missing_features.txt @@ -1,5 +1,4 @@ High priority -- +++++ Switch Sampler-Unpackers connection from Push-Pull to Rep-Req! Highest priority for memory and load-balancing performances! See monitoring examples! - Make the parameter server the single source for the CbmSetup object (remove all disk accesses in Unpacker device!) Low priority diff --git a/MQ/mcbm/runDigiEventSink.cxx b/MQ/mcbm/runDigiEventSink.cxx index bd3d5a6a7a1c3a884fcd03acd2b7d789477b88bd..06e6a97e4fb289cdd8a0baf9c6193e7f1aa579f6 100644 --- a/MQ/mcbm/runDigiEventSink.cxx +++ b/MQ/mcbm/runDigiEventSink.cxx @@ -22,17 +22,14 @@ void addCustomOptions(bpo::options_description& options) "MQ channel name for built events"); options.add_options()("FillHistos", bpo::value<bool>()->default_value(false), "Fill histograms and send them to histo server if true"); - options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), - "MQ channel name for histos"); - options.add_options()("ChNameHistCfg", bpo::value<std::string>()->default_value("histo-conf"), - "MQ channel name for histos config"); - options.add_options()("ChNameCanvCfg", bpo::value<std::string>()->default_value("canvas-conf"), - "MQ channel name for canvases config"); + options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS"); options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), "Minimal time between two publishing"); options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), "Maximal time between two publishing"); + options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), + "MQ channel name for histos"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceDigiEventSink(); } diff --git a/MQ/mcbm/runUnpack.cxx b/MQ/mcbm/runUnpack.cxx index 2eba9e3a9a65bad18b4c8d24b99d457ed2851407..5259fd39a449aa6fbe4cb09cc5a0b727d665079b 100644 --- a/MQ/mcbm/runUnpack.cxx +++ b/MQ/mcbm/runUnpack.cxx @@ -23,10 +23,18 @@ void addCustomOptions(bpo::options_description& options) options.add_options()("SetTimeOffs", bpo::value<std::vector<std::string>>()->multitoken()->composing(), "Set time offset in ns for selected detector, use string matching " "ECbmModuleId,dOffs e.g. kTof,-35.2"); - options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("fullts"), + options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"), "MQ channel name for raw TS data"); options.add_options()("TsNameOut", bpo::value<std::string>()->default_value("unpts_0"), "MQ channel name for unpacked TS data"); + + options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(0), "Histo publishing frequency in TS"); + options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), + "Minimal time between two publishing"); + options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), + "Maximal time between two publishing"); + options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), + "MQ channel name for histos"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceUnpack(); } diff --git a/MQ/mcbm/startBuildRawEvents2021.sh.in b/MQ/mcbm/startBuildRawEvents2021.sh.in index 3718ef3d7e53614b4a5159470d1f45bef925c42c..1da57926ad7e072adfdaeda9bc18ece66b9b9eb4 100755 --- a/MQ/mcbm/startBuildRawEvents2021.sh.in +++ b/MQ/mcbm/startBuildRawEvents2021.sh.in @@ -89,7 +89,7 @@ LOGFILETAG+=".log" echo "Buffer size for parallel devices $_paraBuffSz" echo "Buffer size for singleton devices $_singBuffSz" -SAMPLER="MultiTsaSampler" +SAMPLER="RepReqTsSampler" SAMPLER+=" --id sampler1" #SAMPLER+=" --max-timeslices 0" #SAMPLER+=" --max-timeslices 10" @@ -110,13 +110,14 @@ SAMPLER+=" --high-water-mark 1000" SAMPLER+=" --no-split-ts 1" SAMPLER+=" --ChNameMissTs missedts" SAMPLER+=" --ChNameCmds commands" -SAMPLER+=" --channel-config name=fullts,type=push,method=bind,address=tcp://127.0.0.1:11555" -#SAMPLER+=" --channel-config name=fullts,type=push,method=bind,address=tcp://127.0.0.1:11555,sndBufSize=$_singBuffSz,rcvBuffSize=$_paraBuffSz" -#SAMPLER+=" --transport shmem" -SAMPLER+=" --transport zeromq" -#SAMPLER+=" --transport nanomsg" +SAMPLER+=" --PubFreqTs $_pubfreqts" +SAMPLER+=" --PubTimeMin $_pubminsec" +SAMPLER+=" --PubTimeMax $_pubmaxsec" +SAMPLER+=" --channel-config name=ts-request,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11555" +SAMPLER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666" SAMPLER+=" --channel-config name=missedts,type=pub,method=bind,address=tcp://127.0.0.1:11006" SAMPLER+=" --channel-config name=commands,type=pub,method=bind,address=tcp://127.0.0.1:11007" +SAMPLER+=" --transport zeromq" # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log SAMPLER_LOG="sampler1_$LOGFILETAG" @@ -142,19 +143,16 @@ while (( _iMoni < _nbmoni )); do UNPACKER+=" --SetTimeOffs kTOF,-1220" UNPACKER+=" --SetTimeOffs kRICH,254800" UNPACKER+=" --SetTimeOffs kPSD,0" + UNPACKER+=" --PubFreqTs $_pubfreqts" + UNPACKER+=" --PubTimeMin $_pubminsec" + UNPACKER+=" --PubTimeMax $_pubmaxsec" UNPACKER+=" --TsNameOut unpts$_iMoni" - UNPACKER+=" --channel-config name=fullts,type=pull,method=connect,address=tcp://127.0.0.1:11555" - #UNPACKER+=" --transport shmem" - UNPACKER+=" --transport zeromq" - #UNPACKER+=" --transport nanomsg" -# UNPACKER+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005" + UNPACKER+=" --channel-config name=ts-request,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11555" UNPACKER+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" UNPACKER+=" --channel-config name=unpts$_iMoni,type=push,method=bind,transport=zeromq,address=tcp://127.0.0.1:$_iPort" -# UNPACKER+=" --channel-config name=unpts$_iMoni,type=push,method=bind,transport=zeromq,address=tcp://127.0.0.1:$_iPort,sndBufSize=$_paraBuffSz,rcvBuffSize=$_paraBuffSz" # UNPACKER+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" - #UNPACKER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666" - #UNPACKER+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11667,rateLogging=0" - #UNPACKER+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11668,rateLogging=0" + UNPACKER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666" + UNPACKER+=" --transport zeromq" # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log UNPACKER_LOG="unp$_iMoni" @@ -191,15 +189,11 @@ while (( _iMoni < _nbmoni )); do EVTBUILDER+=" --TsNameIn unpts$_iMoni" EVTBUILDER+=" --EvtNameOut events" EVTBUILDER+=" --channel-config name=unpts$_iMoni,type=pull,method=connect,transport=zeromq,address=tcp://127.0.0.1:$_iPort" - #EVTBUILDER+=" --transport shmem" - EVTBUILDER+=" --transport zeromq" - #EVTBUILDER+=" --transport nanomsg" EVTBUILDER+=" --channel-config name=events,type=push,method=connect,transport=zeromq,address=tcp://127.0.0.1:11556" # EVTBUILDER+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" EVTBUILDER+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" EVTBUILDER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666" - EVTBUILDER+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11667,rateLogging=0" - EVTBUILDER+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11668,rateLogging=0" + EVTBUILDER+=" --transport zeromq" # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log EVTBUILDER_LOG="build$_iMoni" @@ -215,6 +209,7 @@ done EVTSINK="DigiEventSink" EVTSINK+=" --id evtsink1" EVTSINK+=" --severity info" +#EVTSINK+=" --severity debug" #EVTSINK+=" --StoreFullTs 1" EVTSINK+=" --OutFileName mcbm_digis_events.root" EVTSINK+=" --FillHistos false" @@ -223,12 +218,9 @@ EVTSINK+=" --PubTimeMin $_pubminsec" EVTSINK+=" --PubTimeMax $_pubmaxsec" EVTSINK+=" --EvtNameIn events" EVTSINK+=" --channel-config name=events,type=pull,method=bind,transport=zeromq,address=tcp://127.0.0.1:11556" -#EVTSINK+=" --channel-config name=events,type=pull,method=bind,transport=zeromq,address=tcp://127.0.0.1:11556,sndBufSize=$_paraBuffSz,rcvBuffSize=$_unpBufSz" EVTSINK+=" --channel-config name=missedts,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11006" EVTSINK+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" EVTSINK+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666" -EVTSINK+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667,rateLogging=0" -EVTSINK+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668,rateLogging=0" # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log EVTSINK_LOG="evtsink1_$LOGFILETAG" @@ -244,7 +236,6 @@ PARAMETERSERVER="parmq-server" PARAMETERSERVER+=" --id parmq-server" PARAMETERSERVER+=" --severity info" PARAMETERSERVER+=" --channel-name parameters" -#PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005" PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" PARAMETERSERVER+=" --first-input-name $_parfileSts;$_parfileMuch;$_parfileTrdAsic;$_parfileTrdDigi;$_parfileTrdGas;$_parfileTrdGain;$_parfileTof;$_parfileRich;$_parfilePsd" PARAMETERSERVER+=" --first-input-type ASCII" diff --git a/MQ/source/CbmMQTsSamplerRepReq.cxx b/MQ/source/CbmMQTsSamplerRepReq.cxx index a650b92ba8e027a1f548f98c7afcf80ed00df61c..0f39bbd7b63e513e9e1d8268c2e10a592b7efde8 100644 --- a/MQ/source/CbmMQTsSamplerRepReq.cxx +++ b/MQ/source/CbmMQTsSamplerRepReq.cxx @@ -434,7 +434,6 @@ std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs() std::unique_ptr<fles::Timeslice> timeslice = fSource->get(); if (timeslice) { if (fulTsCounter < fulMaxTimeslices) { - fulTsCounter++; const fles::Timeslice& ts = *timeslice; uint64_t uTsIndex = ts.index(); @@ -470,13 +469,18 @@ std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs() } // if( 0 < fuPublishFreqTs ) /// Missed TS detection (only if output channel name defined by user) - if ((uTsIndex != (fulPrevTsIndex + 1)) && (0 != fulPrevTsIndex && 0 != uTsIndex)) { + if ((uTsIndex != (fulPrevTsIndex + 1)) && !(0 == fulPrevTsIndex && 0 == uTsIndex && 0 == fulTsCounter)) { LOG(info) << "Missed Timeslices. Old TS Index was " << fulPrevTsIndex << " New TS Index is " << uTsIndex << " diff is " << uTsIndex - fulPrevTsIndex << " Missing are " << uTsIndex - fulPrevTsIndex - 1; if ("" != fsChannelNameMissedTs) { /// Add missing TS indices to a vector and send it in appropriate channel std::vector<uint64_t> vulMissedIndices; + if (0 == fulPrevTsIndex && 0 == fulTsCounter) { + /// Catch case where we do not start with the first TS but in the middle of a run + vulMissedIndices.emplace_back(0); + } + /// Standard cases starting with first TS after the last transmitted one for (uint64_t ulMiss = fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) { vulMissedIndices.emplace_back(ulMiss); } // for( uint64_t ulMiss = fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss ) @@ -497,20 +501,37 @@ std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs() fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fulPrevTsIndex - 1); } // if( 0 < fuPublishFreqTs ) - } // if( ( uTsIndex != ( fulPrevTsIndex + 1 ) ) && ( 0 != fulPrevTsIndex && 0 != uTsIndex ) ) + } // if( ( uTsIndex != ( fulPrevTsIndex + 1 ) ) && !( 0 == fulPrevTsIndex && 0 == uTsIndex ) ) if (0 < fuPublishFreqTs) { fhMissedTS->Fill(0); fhMissedTSEvo->Fill(fdTimeToStart, 0, 1); } // else if( 0 < fuPublishFreqTs ) + fulTsCounter++; fulPrevTsIndex = uTsIndex; if (fulTsCounter % 10000 == 0) { LOG(info) << "Received TS " << fulTsCounter << " with index " << uTsIndex; } LOG(debug) << "Found " << ts.num_components() << " different components in timeslice"; + return timeslice; + } // if (fulTsCounter < fulMaxTimeslices) + else { + CalcRuntime(); + + /// If command channel defined, send command to all "slaves" + if ("" != fsChannelNameCommands) { + /// Wait 1 s before sending an EOF to let all slaves finish processing previous data + std::this_thread::sleep_for(std::chrono::seconds(10)); + std::string sCmd = "EOF "; + sCmd += FormatDecPrintout(fulPrevTsIndex); + sCmd += " "; + sCmd += FormatDecPrintout(fulTsCounter); + SendCommand(sCmd); + } // if( "" != fsChannelNameCommands ) + + return nullptr; } // else of if (fulTsCounter < fulMaxTimeslices) - return timeslice; } // if (timeslice) else { CalcRuntime();