diff --git a/MQ/histoServer/CbmMqHistoServer.cxx b/MQ/histoServer/CbmMqHistoServer.cxx index 5f27627af8bdc1217ab4273ecc00c6ad2507f504..9b25d4cbff9f08c97e195d00b764479f4fa49654 100644 --- a/MQ/histoServer/CbmMqHistoServer.cxx +++ b/MQ/histoServer/CbmMqHistoServer.cxx @@ -38,7 +38,7 @@ CbmMqHistoServer::CbmMqHistoServer() , fsChannelNameHistosInput("histogram-in") , fsChannelNameHistosConfig("histo-conf") , fsChannelNameCanvasConfig("canvas-conf") - , fsHistoFileName("HistosMonitorPulser.root") + , fsHistoFileName("MqHistos.root") , fuHttpServerPort(8098) , fArrayHisto() , fvpsHistosFolder() @@ -87,6 +87,7 @@ void CbmMqHistoServer::InitTask() bool CbmMqHistoServer::ReceiveData(FairMQMessagePtr& msg, int /*index*/) { + LOG(debug) << "CbmMqHistoServer::ReceiveData => Processing histograms update"; TObject* tempObject = nullptr; // Deserialize<RootSerializer>(*msg, tempObject); @@ -111,12 +112,14 @@ bool CbmMqHistoServer::ReceiveData(FairMQMessagePtr& msg, int /*index*/) LOG(warning) << "Unsupported object type for " << pObj->GetName(); } // for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) + LOG(debug) << "CbmMqHistoServer::ReceiveData => Deleting array"; /// Need to use Delete instead of Clear to avoid memory leak!!! arrayHisto->Delete(); /// If new histos received, try to prepare as many canvases as possible /// Should be expensive on start and cheap afterward if (!fbAllCanvasReady) { + LOG(debug) << "CbmMqHistoServer::ReceiveData => Checking for canvases updates"; for (uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { /// Jump canvases already ready if (fvbCanvasReady[uCanv]) continue; @@ -149,6 +152,8 @@ bool CbmMqHistoServer::ReceiveData(FairMQMessagePtr& msg, int /*index*/) bMqHistoServerSaveHistos = kFALSE; } // if( bMqHistoServerSaveHistos ) */ + LOG(debug) << "CbmMqHistoServer::ReceiveData => Finished processing histograms update"; + return true; } @@ -201,7 +206,7 @@ bool CbmMqHistoServer::ReceiveCanvasConfig(FairMQMessagePtr& msg, int /*index*/) } // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv ) if (uPrevCanv < fvpsCanvasConfig.size()) { - LOG(warning) << " Ignored new configuration for histo " << tempObject.first + LOG(warning) << " Ignored new configuration for Canvas " << tempObject.first << " due to previously received one: " << tempObject.second; /// Not sure if we should return false here... } // if( uPrevCanv < fvpsCanvasConfig.size() ) @@ -222,7 +227,7 @@ bool CbmMqHistoServer::ReceiveConfigAndData(FairMQParts& parts, int /*index*/) /// Reject anything but a at least Header + Histo Config + Canvas Config + Histo Data if (parts.Size() < 4) { if (1 == parts.Size()) { - /// PAL, 09/04/2021, Debug message catching missed method overlad/polymorphism: + /// PAL, 09/04/2021, Debug message catching missed method overload/polymorphism: /// contrary to my expectation, if 2 method bound to same channel, one with FairMQMessagePtr and one with /// FairMQParts, all messages go to multipart version and FairMQMessagePtr is converted to size 1 FairMQParts LOG(debug) << "CbmMqHistoServer::ReceiveConfigAndData => only 1 parts found in input, " @@ -243,9 +248,27 @@ bool CbmMqHistoServer::ReceiveConfigAndData(FairMQParts& parts, int /*index*/) LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received configuration for " << pairHeader.first << " histos and " << pairHeader.second << " canvases"; - if (static_cast<size_t>(parts.Size()) != 1 + pairHeader.first + pairHeader.second + 1) { + uint32_t uOffsetHistoConfig = pairHeader.first; + if (0 == pairHeader.first) { + uOffsetHistoConfig = 1; + if (0 < (parts.At(uOffsetHistoConfig))->GetSize()) { + LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No histo config expected but corresponding message is" + << " not empty: " << (parts.At(uOffsetHistoConfig))->GetSize(); + } + } + + uint32_t uOffsetCanvasConfig = pairHeader.second; + if (0 == pairHeader.second) { + uOffsetCanvasConfig = 1; + if (0 < (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize()) { + LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No Canvas config expected but corresponding message is" + << " not empty: " << (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize(); + } + } + + if (static_cast<size_t>(parts.Size()) != 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1) { LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Number of parts not matching header: " << parts.Size() - << " instead of " << 1 + pairHeader.first + pairHeader.second + 1; + << " instead of " << 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1; } // if( parts.Size() != 1 + pairHeader.first + pairHeader.second ) /// Decode parts for histograms configuration @@ -255,11 +278,14 @@ bool CbmMqHistoServer::ReceiveConfigAndData(FairMQParts& parts, int /*index*/) /// Decode parts for histograms configuration for (uint32_t uCanv = 0; uCanv < pairHeader.second; ++uCanv) { - ReceiveCanvasConfig(parts.At(1 + pairHeader.first + uCanv), 0); + ReceiveCanvasConfig(parts.At(1 + uOffsetHistoConfig + uCanv), 0); } // for (UInt_t uCanv = 0; uCanv < pairHeader.second; ++uCanv) /// Decode the histograms data now that the configuration is loaded - ReceiveData(parts.At(1 + pairHeader.first + pairHeader.second), 0); + ReceiveData(parts.At(1 + uOffsetHistoConfig + uOffsetCanvasConfig), 0); + + LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Finished processing composed message with " << parts.Size() + << " parts"; return true; } @@ -357,6 +383,7 @@ bool CbmMqHistoServer::ResetHistograms() } bool CbmMqHistoServer::PrepareCanvas(uint32_t uCanvIdx) { + LOG(debug) << " Extracting configuration for canvas index " << uCanvIdx; CanvasConfig conf(ExtractCanvasConfigFromString(fvpsCanvasConfig[uCanvIdx].second)); /// First check if all objects to be drawn are present @@ -374,6 +401,8 @@ bool CbmMqHistoServer::PrepareCanvas(uint32_t uCanvIdx) } // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx ) } // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx ) + LOG(info) << " All histos found for canvas " << conf.GetName().data() << ", now preparing it"; + /// Create new canvas and pads TCanvas* pNewCanv = new TCanvas(conf.GetName().data(), conf.GetTitle().data()); pNewCanv->Divide(conf.GetNbPadsX(), conf.GetNbPadsY()); @@ -405,7 +434,10 @@ bool CbmMqHistoServer::PrepareCanvas(uint32_t uCanvIdx) dynamic_cast<TH1*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data()); } // if( nullptr != dynamic_cast< TH1 *>( pObj ) ) else - LOG(warning) << "Unsupported object type for " << sName << " when preparing canvas " << conf.GetName(); + LOG(warning) << " Unsupported object type for " << sName << " when preparing canvas " << conf.GetName(); + + LOG(info) << " Configured histo " << sName << " on pad " << (1 + uPadIdx) << " for canvas " + << conf.GetName().data(); } // if( "nullptr" != sName ) } // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx ) } // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx ) @@ -414,7 +446,7 @@ bool CbmMqHistoServer::PrepareCanvas(uint32_t uCanvIdx) fServer->Register(Form("/%s", fvCanvas[uCanvIdx].second.data()), fvCanvas[uCanvIdx].first); fvbCanvasRegistered[uCanvIdx] = true; - LOG(info) << "registered canvas " << fvCanvas[uCanvIdx].first->GetName() << " in folder " + LOG(info) << " Registered canvas " << fvCanvas[uCanvIdx].first->GetName() << " in folder " << fvCanvas[uCanvIdx].second; /// Update flag telling whether all known canvases are registered diff --git a/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx b/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx index b43711c253f465bd272b0fb32b7ee9c70841f1e0..b504518e7a903026f0f64c1798b4267cfb0bdd7e 100644 --- a/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx +++ b/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx @@ -267,13 +267,13 @@ try { /* clang-format on */ /// Create input vectors - fvDigiT0 = new std::vector<CbmTofDigi>(); - fvDigiSts = new std::vector<CbmStsDigi>(); - fvDigiMuch = new std::vector<CbmMuchDigi>(); - fvDigiTrd = new std::vector<CbmTrdDigi>(); - fvDigiTof = new std::vector<CbmTofDigi>(); - fvDigiRich = new std::vector<CbmRichDigi>(); - fvDigiPsd = new std::vector<CbmPsdDigi>(); + fvDigiT0 = new std::vector<CbmTofDigi>(1000000); + fvDigiSts = new std::vector<CbmStsDigi>(1000000); + fvDigiMuch = new std::vector<CbmMuchDigi>(1000000); + fvDigiTrd = new std::vector<CbmTrdDigi>(1000000); + fvDigiTof = new std::vector<CbmTofDigi>(1000000); + fvDigiRich = new std::vector<CbmRichDigi>(1000000); + fvDigiPsd = new std::vector<CbmPsdDigi>(1000000); fCbmTsEventHeader = new CbmTsEventHeader(); @@ -565,6 +565,13 @@ bool CbmDeviceBuildDigiEvents::SendHistoConfAndData() partsOut.AddPart(std::move(messageHist)); } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + /// Catch case where no histos are registered! + /// => Add empty message + if (0 == fvpsHistosFolder.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { /// Serialize the vector of canvas config into a single MQ message FairMQMessagePtr messageCan(NewMessage()); @@ -573,6 +580,13 @@ bool CbmDeviceBuildDigiEvents::SendHistoConfAndData() partsOut.AddPart(std::move(messageCan)); } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + /// Catch case where no Canvases are registered! + /// => Add empty message + if (0 == fvpsCanvasConfig.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + /// Serialize the array of histos into a single MQ message FairMQMessagePtr msgHistos(NewMessage()); // Serialize<RootSerializer>(*msgHistos, &fArrayHisto); diff --git a/MQ/mcbm/CbmDeviceDigiEventSink.cxx b/MQ/mcbm/CbmDeviceDigiEventSink.cxx index f0d12e15c5e89b0383d7ab10565efd361f8bdc47..c3306319149ec0b1a1d942f037bfa1556f477e06 100644 --- a/MQ/mcbm/CbmDeviceDigiEventSink.cxx +++ b/MQ/mcbm/CbmDeviceDigiEventSink.cxx @@ -546,6 +546,13 @@ bool CbmDeviceDigiEventSink::SendHistoConfAndData() partsOut.AddPart(std::move(messageHist)); } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + /// Catch case where no histos are registered! + /// => Add empty message + if (0 == fvpsHistosFolder.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { /// Serialize the vector of canvas config into a single MQ message FairMQMessagePtr messageCan(NewMessage()); @@ -555,6 +562,13 @@ bool CbmDeviceDigiEventSink::SendHistoConfAndData() partsOut.AddPart(std::move(messageCan)); } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + /// Catch case where no Canvases are registered! + /// => Add empty message + if (0 == fvpsCanvasConfig.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + /// Serialize the array of histos into a single MQ message FairMQMessagePtr msgHistos(NewMessage()); // Serialize<RootSerializer>(*msgHistos, &fArrayHisto); diff --git a/MQ/mcbm/CbmDeviceEventSink.cxx b/MQ/mcbm/CbmDeviceEventSink.cxx index 584b756e747fbe37d983e9a85a873f473e8a2ba3..1da07214f543a517f19f26cd96f3f7340e71202a 100644 --- a/MQ/mcbm/CbmDeviceEventSink.cxx +++ b/MQ/mcbm/CbmDeviceEventSink.cxx @@ -498,6 +498,13 @@ bool CbmDeviceEventSink::SendHistoConfAndData() partsOut.AddPart(std::move(messageHist)); } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + /// Catch case where no histos are registered! + /// => Add empty message + if (0 == fvpsHistosFolder.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { /// Serialize the vector of canvas config into a single MQ message FairMQMessagePtr messageCan(NewMessage()); @@ -507,6 +514,13 @@ bool CbmDeviceEventSink::SendHistoConfAndData() partsOut.AddPart(std::move(messageCan)); } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + /// Catch case where no Canvases are registered! + /// => Add empty message + if (0 == fvpsCanvasConfig.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + /// Serialize the array of histos into a single MQ message FairMQMessagePtr msgHistos(NewMessage()); // Serialize<RootSerializer>(*msgHistos, &fArrayHisto); diff --git a/MQ/mcbm/CbmDeviceUnpack.cxx b/MQ/mcbm/CbmDeviceUnpack.cxx index 6bda42347619d642ec49837504e9513128f61900..50c08718f1563bae48bf0c4321fbf3ba23b14da4 100644 --- a/MQ/mcbm/CbmDeviceUnpack.cxx +++ b/MQ/mcbm/CbmDeviceUnpack.cxx @@ -776,6 +776,13 @@ bool CbmDeviceUnpack::SendHistoConfAndData() partsOut.AddPart(std::move(messageHist)); } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + /// Catch case where no histos are registered! + /// => Add empty message + if (0 == fvpsHistosFolder.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { /// Serialize the vector of canvas config into a single MQ message FairMQMessagePtr messageCan(NewMessage()); @@ -785,6 +792,13 @@ bool CbmDeviceUnpack::SendHistoConfAndData() partsOut.AddPart(std::move(messageCan)); } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + /// Catch case where no Canvases are registered! + /// => Add empty message + if (0 == fvpsCanvasConfig.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + /// Serialize the array of histos into a single MQ message FairMQMessagePtr msgHistos(NewMessage()); // Serialize<RootSerializer>(*msgHistos, &fArrayHisto); diff --git a/MQ/source/CbmMQTsSamplerRepReq.cxx b/MQ/source/CbmMQTsSamplerRepReq.cxx index e16e791114f634bda8ded79db423a1dafd0a1ea4..61decdbeb490376c8a0b21f49e24152a46f6d6e6 100644 --- a/MQ/source/CbmMQTsSamplerRepReq.cxx +++ b/MQ/source/CbmMQTsSamplerRepReq.cxx @@ -354,9 +354,14 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) /// Initialize the histograms if (0 < fuPublishFreqTs && 0 == fulTsCounter) { InitHistograms(); } // if( 0 < fuPublishFreqTs ) + if (fbEofFound) { + /// Ignore all requests if EOS reached + return true; + } + if (fbNoSplitTs) { - if (!CreateAndSendFullTs()) { + if (!CreateAndSendFullTs() && !fbEofFound) { /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending a STOP to let all slaves finish processing previous data @@ -365,7 +370,7 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) } // if( "" != fsChannelNameCommands ) return false; - } // if( !CreateAndSendFullTs( ts ) ) + } // if( !CreateAndSendFullTs( ts ) && !fbEofFound) } // if( fbNoSplitTs ) else if (fbSendTsPerSysId) { /// TODO: add support for alternative request with "system name" instead of "system ID" @@ -375,7 +380,7 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) /// This assumes that the order of the components does NOT change after the first TS /// That should be the case as the component index correspond to a physical link idx - if (!CreateCombinedComponentsPerSysId(iSysId)) { + if (!CreateCombinedComponentsPerSysId(iSysId) && !fbEofFound) { /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending a STOP to let all slaves finish processing previous data @@ -384,7 +389,7 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) } // if( "" != fsChannelNameCommands ) return false; - } // if(!CreateAndCombineComponentsPerSysId(iSysId) ) + } // if(!CreateAndCombineComponentsPerSysId(iSysId) && !fbEofFound) } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs else if (fbSendTsPerBlock) { std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize()); @@ -392,7 +397,7 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) /// This assumes that the order of the components does NOT change after the first TS /// That should be the case as the component index correspond to a physical link idx - if (!CreateCombinedComponentsPerBlock(reqStr)) { + if (!CreateCombinedComponentsPerBlock(reqStr) && !fbEofFound) { /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending a STOP to let all slaves finish processing previous data @@ -401,7 +406,7 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) } // if( "" != fsChannelNameCommands ) return false; - } // if( !CreateAndCombineComponentsPerChannel(reqStr) ) + } // if( !CreateAndCombineComponentsPerChannel(reqStr) && !fbEofFound) } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs ) /// Send histograms each 100 time slices. Should be each ~1s @@ -530,6 +535,8 @@ std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs() SendCommand(sCmd); } // if( "" != fsChannelNameCommands ) + fbEofFound = true; + return nullptr; } // else of if (fulTsCounter < fulMaxTimeslices) } // if (timeslice) @@ -547,6 +554,8 @@ std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs() SendCommand(sCmd); } // if( "" != fsChannelNameCommands ) + fbEofFound = true; + return nullptr; } // else of if (timeslice) } @@ -915,6 +924,13 @@ bool CbmMQTsSamplerRepReq::SendHistoConfAndData() partsOut.AddPart(std::move(messageHist)); } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + /// Catch case where no histos are registered! + /// => Add empty message + if (0 == fvpsHistosFolder.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { /// Serialize the vector of canvas config into a single MQ message FairMQMessagePtr messageCan(NewMessage()); @@ -924,6 +940,13 @@ bool CbmMQTsSamplerRepReq::SendHistoConfAndData() partsOut.AddPart(std::move(messageCan)); } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + /// Catch case where no Canvases are registered! + /// => Add empty message + if (0 == fvpsCanvasConfig.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + /// Serialize the array of histos into a single MQ message FairMQMessagePtr msgHistos(NewMessage()); // Serialize<RootSerializer>(*msgHistos, &fArrayHisto); diff --git a/MQ/source/CbmMQTsSamplerRepReq.h b/MQ/source/CbmMQTsSamplerRepReq.h index 5446a11d9aa8e195e739471091f63b4208c984bc..a5d186f741d5a86d22af2fa2fba127826e1d4df3 100644 --- a/MQ/source/CbmMQTsSamplerRepReq.h +++ b/MQ/source/CbmMQTsSamplerRepReq.h @@ -119,6 +119,9 @@ private: std::deque<std::unique_ptr<fles::Timeslice>> fdpTimesliceBuffer = {}; std::deque<std::vector<bool>> fdbCompSentFlags = {}; + /// Flag indicating the EOF was reached to avoid sending an emergency STOP + bool fbEofFound = false; + std::string fsChannelNameMissedTs = ""; std::string fsChannelNameCommands = "";