diff --git a/MQ/source/CbmMQTsaMultiSampler.cxx b/MQ/source/CbmMQTsaMultiSampler.cxx index d18a41f77ad26228b2a657483fc65fb7b3e9bf3a..b810a9f542d62596579091e7ff4b82f6a7c7b4dc 100644 --- a/MQ/source/CbmMQTsaMultiSampler.cxx +++ b/MQ/source/CbmMQTsaMultiSampler.cxx @@ -326,7 +326,7 @@ bool CbmMQTsaMultiSampler::InitHistograms() fhTsSize = new TH1I("TsSize", "Size of TS; Size [MB]", 15000, 0., 15000.); fhTsSizeEvo = new TProfile("TsSizeEvo", "Evolution of the TS Size; t [s]; Mean size [MB]", 1800, 0., 1800.); fhTsMaxSizeEvo = new TH1F("TsMaxSizeEvo", "Evolution of maximal TS Size; t [s]; Max size [MB]", 1800, 0., 1800.); - fhMissedTS = new TH1I("Missed_TS", "Missed TS", 2, 0., 2.); + fhMissedTS = new TH1I("Missed_TS", "Missed TS", 2, -0.5, 1.5); fhMissedTSEvo = new TProfile("Missed_TS_Evo", "Missed TS evolution; t [s]", 1800, 0., 1800.); /// Add histo pointers to the histo vector @@ -441,6 +441,12 @@ bool CbmMQTsaMultiSampler::ConditionalRun() InitHistograms(); } // if( 0 < fuPublishFreqTs ) + /// initialize the source (connect to emitter, ...) + if( 0 == fTSCounter && nullptr != dynamic_cast< fles::TimesliceMultiSubscriber *>(fSource) ) + { + dynamic_cast< fles::TimesliceMultiSubscriber *>(fSource)->InitTimesliceSubscriber(); + } // if( 0 == fTSCounter && nullptr != dynamic_cast< fles::TimesliceMultiSubscriber >(fSource) ) + auto timeslice = fSource->get(); if (timeslice) { if (fTSCounter < fMaxTimeslices) { @@ -480,31 +486,36 @@ bool CbmMQTsaMultiSampler::ConditionalRun() } // if( 0 < fuPublishFreqTs ) /// Missed TS detection (only if output channel name defined by user) - if ((uTsIndex != (fuPrevTsIndex + 1)) && (0 != fuPrevTsIndex && 0 != uTsIndex) && "" != fsChannelNameMissedTs) { - LOG(debug) << "Missed Timeslices. Old TS Index was " << fuPrevTsIndex << " New TS Index is " << uTsIndex; - /// Add missing TS indices to a vector and send it in appropriate channel - std::vector<uint64_t> vulMissedIndices; - for (uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) { - vulMissedIndices.emplace_back(ulMiss); - } // for( uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss ) - if (!SendMissedTsIdx(vulMissedIndices)) { - /// If command channel defined, send command to all "slaves" - if ("" != fsChannelNameCommands) { - /// Wait 1 s before sending a STOP to let all slaves finish processing previous data - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - SendCommand("STOP"); - } // if( "" != fsChannelNameCommands ) + if ((uTsIndex != (fuPrevTsIndex + 1)) && (0 != fuPrevTsIndex && 0 != uTsIndex)) { + LOG(info) << "Missed Timeslices. Old TS Index was " << fuPrevTsIndex << " New TS Index is " << uTsIndex + << " diff is " << uTsIndex - fuPrevTsIndex << " Missing are " << uTsIndex - fuPrevTsIndex - 1; + + if( "" != fsChannelNameMissedTs ) { + /// Add missing TS indices to a vector and send it in appropriate channel + std::vector<uint64_t> vulMissedIndices; + for (uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) { + vulMissedIndices.emplace_back(ulMiss); + } // for( uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss ) + if (!SendMissedTsIdx(vulMissedIndices)) { + /// If command channel defined, send command to all "slaves" + if ("" != fsChannelNameCommands) { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + SendCommand("STOP"); + } // if( "" != fsChannelNameCommands ) - return false; - } // if( !SendMissedTsIdx( vulMissedIndices ) ) + return false; + } // if( !SendMissedTsIdx( vulMissedIndices ) ) + } // if( "" != fsChannelNameMissedTs ) if (0 < fuPublishFreqTs) { - fhMissedTS->Fill(1, uTsIndex - fuPrevTsIndex); - fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fuPrevTsIndex); + fhMissedTS->Fill(1, uTsIndex - fuPrevTsIndex - 1); + fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fuPrevTsIndex - 1); } // if( 0 < fuPublishFreqTs ) - } // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && ( 0 != fuPrevTsIndex && 0 != uTsIndex ) && "" != fsChannelNameMissedTs ) - else if (0 < fuPublishFreqTs) { + } // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && ( 0 != fuPrevTsIndex && 0 != uTsIndex ) ) + + if (0 < fuPublishFreqTs) { fhMissedTS->Fill(0); fhMissedTSEvo->Fill(fdTimeToStart, 0, 1); } // else if( 0 < fuPublishFreqTs ) diff --git a/external/ipc/CMakeLists.txt b/external/ipc/CMakeLists.txt index dea275cddc0d57740beece2d15c0ef40a40e2c0a..b8bf03911038938b5a812e9c9467666cfb810955 100644 --- a/external/ipc/CMakeLists.txt +++ b/external/ipc/CMakeLists.txt @@ -8,6 +8,7 @@ download_project_if_needed(PROJECT fles_ipc GIT_TAG "92ff50ead204d0acb4fccd9cbb9876817d077528" GIT_STASH TRUE SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/ipc + PATCH_COMMAND "patch -p1 < ${CMAKE_CURRENT_SOURCE_DIR}/TimesliceMultiSubscriber_init.patch" TEST_FILE CMakeLists.txt ) diff --git a/external/ipc/TimesliceMultiSubscriber_init.patch b/external/ipc/TimesliceMultiSubscriber_init.patch new file mode 100644 index 0000000000000000000000000000000000000000..bcc17df7de048233e31e96f7edd0c6f70aefbde4 --- /dev/null +++ b/external/ipc/TimesliceMultiSubscriber_init.patch @@ -0,0 +1,31 @@ +diff --git a/lib/fles_ipc/TimesliceMultiSubscriber.cpp b/lib/fles_ipc/TimesliceMultiSubscriber.cpp +index 1668a8c..24952c9 100644 +--- a/lib/fles_ipc/TimesliceMultiSubscriber.cpp ++++ b/lib/fles_ipc/TimesliceMultiSubscriber.cpp +@@ -26,7 +26,7 @@ TimesliceMultiSubscriber::TimesliceMultiSubscriber( + L_(fatal) << "No server defined"; + exit(1); + } +- InitTimesliceSubscriber(); ++// InitTimesliceSubscriber(); + } + + void TimesliceMultiSubscriber::CreateHostPortFileList(std::string inputString) { +diff --git a/lib/fles_ipc/TimesliceMultiSubscriber.hpp b/lib/fles_ipc/TimesliceMultiSubscriber.hpp +index ed8e0d7..539a6e8 100644 +--- a/lib/fles_ipc/TimesliceMultiSubscriber.hpp ++++ b/lib/fles_ipc/TimesliceMultiSubscriber.hpp +@@ -42,10 +42,11 @@ public: + + bool eos() const override { return sortedSource_.empty(); } + ++ void InitTimesliceSubscriber(); ++ + private: + Timeslice* do_get() override; + +- void InitTimesliceSubscriber(); + void CreateHostPortFileList(std::string /*inputString*/); + std::unique_ptr<Timeslice> GetNextTimeslice(); + + diff --git a/fles/mcbm2018/CbmMcbm2018Source.cxx b/fles/mcbm2018/CbmMcbm2018Source.cxx index 3792c214882ea8ad2c1ca79b8f98cb759c811780..53a7328adb22e626a0e35929d6034bea874ae995 100644 --- a/fles/mcbm2018/CbmMcbm2018Source.cxx +++ b/fles/mcbm2018/CbmMcbm2018Source.cxx @@ -71,6 +71,10 @@ Bool_t CbmMcbm2018Source::Init() fileList.pop_back(); // Remove the last ; fSource.reset(new fles::TimesliceMultiSubscriber(fileList, fuSubscriberHwm)); + /// Initialize the Multisubscriber + /// (This restores the original behavior after modifications needed to make the MQ version + dynamic_cast< fles::TimesliceMultiSubscriber *>(fSource.get())->InitTimesliceSubscriber(); + if (!fSource) { LOG(fatal) << "Could not connect to publisher."; } } else { @@ -276,15 +280,15 @@ Int_t CbmMcbm2018Source::FillBuffer() auto tsIndex = ts.index(); if ((tsIndex != (fTSNumber + 1)) && (fTSNumber != 0)) { LOG(debug) << "Missed Timeslices. Old TS Number was " << fTSNumber << " New TS Number is " << tsIndex; - fHistoMissedTS->Fill(1, tsIndex - fTSNumber); - fHistoMissedTSEvo->Fill(tsIndex, 1, tsIndex - fTSNumber); + fHistoMissedTS->Fill(1, tsIndex - fTSNumber - 1); + fHistoMissedTSEvo->Fill(tsIndex, 1, tsIndex - fTSNumber - 1); fNofTSSinceLastTS = tsIndex - fTSNumber; } else { - fHistoMissedTS->Fill(0); - fHistoMissedTSEvo->Fill(tsIndex, 0, 1); fNofTSSinceLastTS = 1; } + fHistoMissedTS->Fill(0); + fHistoMissedTSEvo->Fill(tsIndex, 0, 1); fTSNumber = tsIndex; if (0 == fTSNumber % 1000) { LOG(info) << "Reading Timeslice " << fTSNumber; }