diff --git a/MQ/monitor/CMakeLists.txt b/MQ/monitor/CMakeLists.txt index e8366b2b7f979524e6d2fb91149a75f44f9f0e19..dd39732bd38ed926759d8d01f9cb6ec503de8076 100644 --- a/MQ/monitor/CMakeLists.txt +++ b/MQ/monitor/CMakeLists.txt @@ -70,11 +70,13 @@ If(FAIRLOGGER_FOUND) ) EndIf() +set(BASE_DEPENDENCIES ${DEPENDENCIES}) + set(EXE_NAME T0MonitorMcbm2018) set(SRCS CbmDeviceMonitorT0.cxx runMonitorT0.cxx) set(DEPENDENCIES - ${DEPENDENCIES} + ${BASE_DEPENDENCIES} ${FAIR_LIBS} ${BOOST_LIBS} fles_ipc @@ -95,7 +97,49 @@ set(EXE_NAME TofMonitorMcbm2018) set(SRCS CbmDeviceMonitorTof.cxx runMonitorTof.cxx) set(DEPENDENCIES - ${DEPENDENCIES} + ${BASE_DEPENDENCIES} + ${FAIR_LIBS} + ${BOOST_LIBS} + fles_ipc + CbmFlibMcbm2018 + CbmFlibFlesTools + CbmBase + CbmData + Core + RIO + Net + Hist + RHTTP +) +#GENERATE_LIBRARY() +GENERATE_EXECUTABLE() + +set(EXE_NAME T0MonitorReqMcbm) +set(SRCS CbmDeviceMonitorReqT0.cxx runMonitorReqT0.cxx) + +set(DEPENDENCIES + ${BASE_DEPENDENCIES} + ${FAIR_LIBS} + ${BOOST_LIBS} + fles_ipc + CbmFlibMcbm2018 + CbmFlibFlesTools + CbmBase + CbmData + Core + RIO + Net + Hist + RHTTP +) +#GENERATE_LIBRARY() +GENERATE_EXECUTABLE() + +set(EXE_NAME TofMonitorReqMcbm) +set(SRCS CbmDeviceMonitorReqTof.cxx runMonitorReqTof.cxx) + +set(DEPENDENCIES + ${BASE_DEPENDENCIES} ${FAIR_LIBS} ${BOOST_LIBS} fles_ipc diff --git a/MQ/monitor/CbmDeviceMonitorReqT0.cxx b/MQ/monitor/CbmDeviceMonitorReqT0.cxx new file mode 100644 index 0000000000000000000000000000000000000000..2b84a993043b89e6c811da425527159f1d4be29c --- /dev/null +++ b/MQ/monitor/CbmDeviceMonitorReqT0.cxx @@ -0,0 +1,357 @@ +/** @file CbmDeviceMonitorReqT0.cxx + * @copyright Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#include "CbmDeviceMonitorReqT0.h" + +#include "CbmFlesCanvasTools.h" +#include "CbmMcbm2018MonitorAlgoT0.h" + +#include "StorableTimeslice.hpp" + +#include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig +#include "FairParGenericSet.h" + +#include "TCanvas.h" +#include "TFile.h" +#include "TH1.h" +#include "TList.h" +#include "TNamed.h" + +#include "BoostSerializer.h" +#include <boost/archive/binary_iarchive.hpp> +#include <boost/serialization/utility.hpp> + +#include <array> +#include <iomanip> +#include <stdexcept> +#include <string> + +#include "RootSerializer.h" +struct InitTaskError : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +using namespace std; + + +CbmDeviceMonitorReqT0::CbmDeviceMonitorReqT0() : fMonitorAlgo {new CbmMcbm2018MonitorAlgoT0()} {} + +void CbmDeviceMonitorReqT0::InitTask() +try { + /// Read options from executable + LOG(info) << "Init options for CbmMqStarHistoServer."; + fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs"); + fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz"); + fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin"); + fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax"); + fuOffSpillCountLimit = fConfig->GetValue<uint32_t>("SpillThr"); + fuOffSpillCountLimitNonPulser = fConfig->GetValue<uint32_t>("SpillThrNonPuls"); + fdSpillCheckInterval = fConfig->GetValue<double>("SpillCheckInt"); + std::string sChanMap = fConfig->GetValue<std::string>("ChanMap"); + fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); + fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); + fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); + fsTsBlockName = fConfig->GetValue<std::string>("TsBlockName"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); + + UInt_t uChanIdx = 0; + size_t charPosDel = sChanMap.find(','); + while (uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel) { + fvuChanMap[uChanIdx] = std::stoul(sChanMap.substr(0, charPosDel)); + sChanMap = sChanMap.substr(charPosDel + 1); + uChanIdx++; + charPosDel = sChanMap.find(','); + } // while( uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel ) + if (uChanIdx < fvuChanMap.size()) { + fvuChanMap[uChanIdx] = std::stoul(sChanMap); + } // if( uChanIdx < fvuChanMap.size() ) + + LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs; + LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime; + LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime; + + if ("" == fsTsBlockName) { + // + LOG(info) << "Requesting TS using the SysId: 0x" << std::hex << kusSysId << std::dec; + } + else { + // + LOG(info) << "Requesting TS using the following block name: " << fsTsBlockName; + } +} +catch (InitTaskError& e) { + LOG(error) << e.what(); + ChangeState(fair::mq::Transition::ErrorFound); +} + +bool CbmDeviceMonitorReqT0::InitContainers() +{ + LOG(info) << "Init parameter containers for CbmDeviceMonitorReqT0."; + + fParCList = fMonitorAlgo->GetParList(); + + for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) { + FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC)); + fParCList->Remove(tempObj); + std::string paramName {tempObj->GetName()}; + // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). + // Should only be used for small data because of the cost of an additional copy + + // Her must come the proper Runid + std::string message = paramName + ",111"; + LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message; + + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + FairParGenericSet* newObj = nullptr; + + if (Send(req, "parameters") > 0) { + if (Receive(rep, "parameters") >= 0) { + if (rep->GetSize() != 0) { + CbmMQTMessage tmsg(rep->GetData(), rep->GetSize()); + newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass())); + LOG(info) << "Received unpack parameter from the server:"; + newObj->print(); + } + else { + LOG(error) << "Received empty reply. Parameter not available"; + } // if (rep->GetSize() != 0) + } // if (Receive(rep, "parameters") >= 0) + } // if (Send(req, "parameters") > 0) + fParCList->AddAt(newObj, iparC); + delete tempObj; + } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) + + /// Need to add accessors for all options + fMonitorAlgo->SetIgnoreOverlapMs(fbIgnoreOverlapMs); + fMonitorAlgo->SetMonitorMode(kTRUE); + fMonitorAlgo->SetHistoryHistoSize(fuHistoryHistoSize); + fMonitorAlgo->SetPulserTotLimits(fuMinTotPulser, fuMaxTotPulser); + fMonitorAlgo->SetSpillThreshold(fuOffSpillCountLimit); + fMonitorAlgo->SetSpillThresholdNonPulser(fuOffSpillCountLimitNonPulser); + fMonitorAlgo->SetSpillCheckInterval(fdSpillCheckInterval); + fMonitorAlgo->SetChannelMap(fvuChanMap[0], fvuChanMap[1], fvuChanMap[2], fvuChanMap[3], fvuChanMap[4], fvuChanMap[5], + fvuChanMap[6], fvuChanMap[7]); + + // fMonitorAlgo->AddMsComponentToList(0, 0x90); + + Bool_t initOK = fMonitorAlgo->InitContainers(); + + return initOK; +} + +bool CbmDeviceMonitorReqT0::InitHistograms() +{ + /// Histos creation and obtain pointer on them + /// Trigger histo creation on all associated algos + bool initOK = fMonitorAlgo->CreateHistograms(); + + /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) + std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector(); + /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) + std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector(); + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { + // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() + // << " in " << vHistos[ uHisto ].second.data() + // ; + fArrayHisto.Add(vHistos[uHisto].first); + std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); + fvpsHistosFolder.push_back(psHistoConfig); + + LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { + // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() + // << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[uCanv].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); + + std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); + + fvpsCanvasConfig.push_back(psCanvConfig); + + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + + return initOK; +} + + +bool CbmDeviceMonitorReqT0::ConditionalRun() +{ + /// First request a new TS (full or single system components or multi-syst components block) + std::string message = fsTsBlockName; + if ("" == message) message = std::to_string(kusSysId); + LOG(debug) << "Requesting new TS by sending message: " << message; + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + if (Send(req, fsChannelNameDataInput) <= 0) { + LOG(error) << "Failed to send the request! message was " << message; + return false; + } // if (Send(req, fsChannelNameDataInput) <= 0) + else if (Receive(rep, fsChannelNameDataInput) < 0) { + LOG(error) << "Failed to receive a reply to the request! message was " << message; + return false; + } // else if (Receive(rep, fsChannelNameDataInput) < 0) + else if (rep->GetSize() == 0) { + LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message; + return false; + } // else if (rep->GetSize() == 0) + + /// Message received, do Algo related Initialization steps if needed + if (0 == fulNumMessages) { + try { + InitContainers(); + } + catch (InitTaskError& e) { + LOG(error) << e.what(); + ChangeState(fair::mq::Transition::ErrorFound); + } + } // if( 0 == fulNumMessages) + + if (0 == fulNumMessages) InitHistograms(); + + fulNumMessages++; + LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize(); + + if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages"; + + std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize()); + std::istringstream iss(msgStr); + boost::archive::binary_iarchive inputArchive(iss); + + /// Create an empty TS and fill it with the incoming message + fles::StorableTimeslice component {0}; + inputArchive >> component; + + /// Process the Timeslice + DoUnpack(component, 0); + + /// Send histograms each 100 time slices. Should be each ~1s + /// Use also runtime checker to trigger sending after M s if + /// processing too slow or delay sending if processing too fast + std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); + std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; + if ((fdMaxPublishTime < elapsedSeconds.count()) + || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { + if (!fbConfigSent) { + // Send the configuration only once per run! + fbConfigSent = SendHistoConfAndData(); + } // if( !fbConfigSent ) + else + SendHistograms(); + + fLastPublishTime = std::chrono::system_clock::now(); + } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) + + return true; +} + +bool CbmDeviceMonitorReqT0::SendHistoConfAndData() +{ + /// Prepare multiparts message and header + std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size()); + FairMQMessagePtr messageHeader(NewMessage()); + Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader); + + FairMQParts partsOut; + partsOut.AddPart(std::move(messageHeader)); + + for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) { + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]); + + partsOut.AddPart(std::move(messageHist)); + } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]); + + partsOut.AddPart(std::move(messageCan)); + } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr msgHistos(NewMessage()); + Serialize<RootSerializer>(*msgHistos, &fArrayHisto); + + partsOut.AddPart(std::move(msgHistos)); + + /// Send the multi-parts message to the common histogram messages queue + if (Send(partsOut, fsChannelNameHistosInput) < 0) { + LOG(error) << "CbmDeviceMonitorReqT0::SendHistoConfAndData => Problem sending data"; + return false; + } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + fMonitorAlgo->ResetHistograms(kFALSE); + + return true; +} + +bool CbmDeviceMonitorReqT0::SendHistograms() +{ + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr message(NewMessage()); + Serialize<RootSerializer>(*message, &fArrayHisto); + + /// Send message to the common histogram messages queue + if (Send(message, fsChannelNameHistosInput) < 0) { + LOG(error) << "Problem sending data"; + return false; + } // if( Send( message, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + fMonitorAlgo->ResetHistograms(kFALSE); + + return true; +} + + +CbmDeviceMonitorReqT0::~CbmDeviceMonitorReqT0() {} + + +Bool_t CbmDeviceMonitorReqT0::DoUnpack(const fles::Timeslice& ts, size_t /*component*/) +{ + fulTsCounter++; + + if (kFALSE == fbComponentsAddedToList) { + for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) { + if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) { + fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysId); + } // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id ) + } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp ) + fbComponentsAddedToList = kTRUE; + } // if( kFALSE == fbComponentsAddedToList ) + + if (kFALSE == fMonitorAlgo->ProcessTs(ts)) { + LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class"; + return kTRUE; + } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) ) + + /// Clear the digis vector in case it was filled + fMonitorAlgo->ClearVector(); + + if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices"; + + return kTRUE; +} + +void CbmDeviceMonitorReqT0::Finish() {} diff --git a/MQ/monitor/CbmDeviceMonitorReqT0.h b/MQ/monitor/CbmDeviceMonitorReqT0.h new file mode 100644 index 0000000000000000000000000000000000000000..fa214428dade65a6b4b251e449b4d452363b1c8d --- /dev/null +++ b/MQ/monitor/CbmDeviceMonitorReqT0.h @@ -0,0 +1,93 @@ +/** @file CbmDeviceMonitorReqT0.h + * @copyright Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#ifndef CBMDEVICEMONITORREQT0_H_ +#define CBMDEVICEMONITORREQT0_H_ + +#include "Timeslice.hpp" + +#include "FairMQDevice.h" + +#include "Rtypes.h" +#include "TMessage.h" +#include "TObjArray.h" + +#include <chrono> +#include <map> +#include <vector> + +class TList; +class CbmMcbm2018MonitorAlgoT0; + +class CbmDeviceMonitorReqT0 : public FairMQDevice { +public: + CbmDeviceMonitorReqT0(); + virtual ~CbmDeviceMonitorReqT0(); + +protected: + virtual void InitTask(); + virtual bool ConditionalRun(); + +private: + /// Constants + static const uint16_t kusSysId = 0x90; + + /// Control flags + Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice + Bool_t fbComponentsAddedToList = kFALSE; + + /// User settings parameters + std::string fsChannelNameDataInput = "ts-request"; + std::string fsTsBlockName = "t0block"; + std::string fsChannelNameHistosInput = "histogram-in"; + uint32_t fuHistoryHistoSize = 3600; + uint32_t fuMinTotPulser = 185; + uint32_t fuMaxTotPulser = 195; + uint32_t fuOffSpillCountLimit = 25; + uint32_t fuOffSpillCountLimitNonPulser = 10; + double fdSpillCheckInterval = 0.0128; + std::vector<uint32_t> fvuChanMap = {0, 1, 2, 3, 4, 5, 6, 7}; + uint32_t fuPublishFreqTs = 100; + double_t fdMinPublishTime = 0.5; + double_t fdMaxPublishTime = 5.0; + + /// Parameters management + TList* fParCList = nullptr; + + /// Statistics & first TS rejection + uint64_t fulNumMessages = 0; + uint64_t fulTsCounter = 0; + std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); + + /// Processing algo + CbmMcbm2018MonitorAlgoT0* fMonitorAlgo; + + /// Array of histograms to send to the histogram server + TObjArray fArrayHisto = {}; + /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server + std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {}; + /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server + /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" + /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" + std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; + /// Flag indicating whether the histograms and canvases configurations were already published + bool fbConfigSent = false; + + bool InitContainers(); + bool InitHistograms(); + bool DoUnpack(const fles::Timeslice& ts, size_t component); + void Finish(); + bool SendHistoConfAndData(); + bool SendHistograms(); +}; + +// special class to expose protected TMessage constructor +class CbmMQTMessage : public TMessage { +public: + CbmMQTMessage(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); } +}; + + +#endif /* CBMDEVICEMONITORREQT0_H_ */ diff --git a/MQ/monitor/CbmDeviceMonitorReqTof.cxx b/MQ/monitor/CbmDeviceMonitorReqTof.cxx new file mode 100644 index 0000000000000000000000000000000000000000..513f5f924d82e3caeda0c4549d9ed1b8e075fe7d --- /dev/null +++ b/MQ/monitor/CbmDeviceMonitorReqTof.cxx @@ -0,0 +1,340 @@ +/** @file CbmDeviceMonitorReqTof.cxx + * @copyright Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#include "CbmDeviceMonitorReqTof.h" + +#include "CbmFlesCanvasTools.h" +#include "CbmMQDefs.h" +#include "CbmMcbm2018MonitorAlgoTof.h" + +#include "StorableTimeslice.hpp" + +#include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig +#include "FairParGenericSet.h" + +#include "TCanvas.h" +#include "TFile.h" +#include "TH1.h" +#include "TList.h" +#include "TNamed.h" + +#include "BoostSerializer.h" +#include <boost/archive/binary_iarchive.hpp> +#include <boost/serialization/utility.hpp> + +#include <array> +#include <iomanip> +#include <stdexcept> +#include <string> + +#include "RootSerializer.h" +struct InitTaskError : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +using namespace std; + +CbmDeviceMonitorReqTof::CbmDeviceMonitorReqTof() : fMonitorAlgo {new CbmMcbm2018MonitorAlgoTof()} {} + +void CbmDeviceMonitorReqTof::InitTask() +try { + /// Read options from executable + LOG(info) << "Init options for CbmMqStarHistoServer."; + + fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs"); + fbDebugMonitorMode = fConfig->GetValue<bool>("DebugMoni"); + fbIgnoreCriticalErrors = fConfig->GetValue<bool>("IgnCritErr"); + fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz"); + fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin"); + fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax"); + fiGdpbIndex = fConfig->GetValue<int32_t>("GdpbIdx"); + + fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); + fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); + fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); + fsTsBlockName = fConfig->GetValue<std::string>("TsBlockName"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); + + LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs; + LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime; + LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime; + + /// Set the Monitor Algo in Absolute time scale + fMonitorAlgo->UseAbsoluteTime(); +} +catch (InitTaskError& e) { + LOG(error) << e.what(); + // Wrapper defined in CbmMQDefs.h to support different FairMQ versions + cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); +} + +Bool_t CbmDeviceMonitorReqTof::InitContainers() +{ + LOG(info) << "Init parameter containers for CbmDeviceMonitorReqTof."; + + fParCList = fMonitorAlgo->GetParList(); + + for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) { + FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC)); + fParCList->Remove(tempObj); + std::string paramName {tempObj->GetName()}; + // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). + // Should only be used for small data because of the cost of an additional copy + + // Here must come the proper Runid + std::string message = paramName + ",111"; + LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message; + + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + FairParGenericSet* newObj = nullptr; + + if (Send(req, "parameters") > 0) { + if (Receive(rep, "parameters") >= 0) { + if (rep->GetSize() != 0) { + CbmMQTMessage tmsg(rep->GetData(), rep->GetSize()); + newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass())); + LOG(info) << "Received unpack parameter from the server:"; + newObj->print(); + } + else { + LOG(error) << "Received empty reply. Parameter not available"; + } // if (rep->GetSize() != 0) + } // if (Receive(rep, "parameters") >= 0) + } // if (Send(req, "parameters") > 0) + fParCList->AddAt(newObj, iparC); + delete tempObj; + } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) + + /// Need to add accessors for all options + fMonitorAlgo->SetIgnoreOverlapMs(fbIgnoreOverlapMs); + fMonitorAlgo->SetDebugMonitorMode(fbDebugMonitorMode); + fMonitorAlgo->SetIgnoreCriticalErrors(fbIgnoreCriticalErrors); + fMonitorAlgo->SetHistoryHistoSize(fuHistoryHistoSize); + fMonitorAlgo->SetPulserTotLimits(fuMinTotPulser, fuMaxTotPulser); + fMonitorAlgo->SetGdpbIndex(fiGdpbIndex); + + Bool_t initOK = fMonitorAlgo->InitContainers(); + + return initOK; +} + +bool CbmDeviceMonitorReqTof::InitHistograms() +{ + /// Histos creation and obtain pointer on them + /// Trigger histo creation on all associated algos + bool initOK = fMonitorAlgo->CreateHistograms(); + + /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) + std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector(); + /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) + std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector(); + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { + // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() + // << " in " << vHistos[ uHisto ].second.data() + // ; + fArrayHisto.Add(vHistos[uHisto].first); + std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); + fvpsHistosFolder.push_back(psHistoConfig); + + LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { + // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() + // << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[uCanv].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); + + std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); + + fvpsCanvasConfig.push_back(psCanvConfig); + + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + + return initOK; +} + + +bool CbmDeviceMonitorReqTof::ConditionalRun() +{ + /// First request a new TS (full or single system components or multi-syst components block) + std::string message = fsTsBlockName; + if ("" == message) message = std::to_string(kusSysIdTof); + LOG(debug) << "Requesting new TS by sending message: " << message; + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + if (Send(req, fsChannelNameDataInput) <= 0) { + LOG(error) << "Failed to send the request! message was " << message; + return false; + } // if (Send(req, fsChannelNameDataInput) <= 0) + else if (Receive(rep, fsChannelNameDataInput) < 0) { + LOG(error) << "Failed to receive a reply to the request! message was " << message; + return false; + } // else if (Receive(rep, fsChannelNameDataInput) < 0) + else if (rep->GetSize() == 0) { + LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message; + return false; + } // else if (rep->GetSize() == 0) + + /// Message received, do Algo related Initialization steps if needed + if (0 == fulNumMessages) { + try { + InitContainers(); + } + catch (InitTaskError& e) { + LOG(error) << e.what(); + ChangeState(fair::mq::Transition::ErrorFound); + } + } // if( 0 == fulNumMessages) + + if (0 == fulNumMessages) InitHistograms(); + + /// Process received message + fulNumMessages++; + LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize(); + + if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages"; + + std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize()); + std::istringstream iss(msgStr); + boost::archive::binary_iarchive inputArchive(iss); + + /// Create an empty TS and fill it with the incoming message + fles::StorableTimeslice component {0}; + inputArchive >> component; + + /// Process the Timeslice + DoUnpack(component, 0); + + /// Send histograms each 100 time slices. Should be each ~1s + /// Use also runtime checker to trigger sending after M s if + /// processing too slow or delay sending if processing too fast + std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); + std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; + if ((fdMaxPublishTime < elapsedSeconds.count()) + || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { + if (!fbConfigSent) { + // Send the configuration only once per run! + fbConfigSent = SendHistoConfAndData(); + } // if( !fbConfigSent ) + else + SendHistograms(); + + fLastPublishTime = std::chrono::system_clock::now(); + } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) + + return true; +} + +bool CbmDeviceMonitorReqTof::SendHistoConfAndData() +{ + /// Prepare multiparts message and header + std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size()); + FairMQMessagePtr messageHeader(NewMessage()); + Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader); + + FairMQParts partsOut; + partsOut.AddPart(std::move(messageHeader)); + + for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) { + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]); + + partsOut.AddPart(std::move(messageHist)); + } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]); + + partsOut.AddPart(std::move(messageCan)); + } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr msgHistos(NewMessage()); + Serialize<RootSerializer>(*msgHistos, &fArrayHisto); + + partsOut.AddPart(std::move(msgHistos)); + + /// Send the multi-parts message to the common histogram messages queue + if (Send(partsOut, fsChannelNameHistosInput) < 0) { + LOG(error) << "CbmDeviceMonitorReqTof::SendHistoConfAndData => Problem sending data"; + return false; + } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + fMonitorAlgo->ResetHistograms(kFALSE); + + return true; +} + +bool CbmDeviceMonitorReqTof::SendHistograms() +{ + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr message(NewMessage()); + Serialize<RootSerializer>(*message, &fArrayHisto); + + /// Send message to the common histogram messages queue + if (Send(message, fsChannelNameHistosInput) < 0) { + LOG(error) << "Problem sending data"; + return false; + } // if( Send( message, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + fMonitorAlgo->ResetHistograms(kFALSE); + + return true; +} + + +CbmDeviceMonitorReqTof::~CbmDeviceMonitorReqTof() {} + + +Bool_t CbmDeviceMonitorReqTof::DoUnpack(const fles::Timeslice& ts, size_t /*component*/) +{ + fulTsCounter++; + + if (kFALSE == fbComponentsAddedToList) { + for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) { + if (kusSysIdTof == ts.descriptor(uCompIdx, 0).sys_id) { + fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdTof); + } // if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id ) + else if (kusSysIdT0 == ts.descriptor(uCompIdx, 0).sys_id) { + fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdT0); + } // if( kusSysIdT0 == ts.descriptor( uCompIdx, 0 ).sys_id ) + } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp ) + fbComponentsAddedToList = kTRUE; + } // if( kFALSE == fbComponentsAddedToList ) + + if (kFALSE == fMonitorAlgo->ProcessTs(ts)) { + LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class"; + return kTRUE; + } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) ) + + /// Clear the digis vector in case it was filled + fMonitorAlgo->ClearVector(); + + if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices"; + + return kTRUE; +} + +void CbmDeviceMonitorReqTof::Finish() {} diff --git a/MQ/monitor/CbmDeviceMonitorReqTof.h b/MQ/monitor/CbmDeviceMonitorReqTof.h new file mode 100644 index 0000000000000000000000000000000000000000..4804b5e1221806c3110ab042adccec5d90f3fecb --- /dev/null +++ b/MQ/monitor/CbmDeviceMonitorReqTof.h @@ -0,0 +1,93 @@ +/** @file CbmDeviceMonitorReqTof.h + * @copyright Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#ifndef CBMDEVICEMONITORREQTOF_H_ +#define CBMDEVICEMONITORREQTOF_H_ + +#include "Timeslice.hpp" + +#include "FairMQDevice.h" + +#include "Rtypes.h" +#include "TMessage.h" +#include "TObjArray.h" + +#include <chrono> +#include <map> +#include <vector> + +class TList; +class CbmMcbm2018MonitorAlgoTof; + +class CbmDeviceMonitorReqTof : public FairMQDevice { +public: + CbmDeviceMonitorReqTof(); + virtual ~CbmDeviceMonitorReqTof(); + +protected: + virtual void InitTask(); + virtual bool ConditionalRun(); + +private: + /// Constants + static const uint16_t kusSysIdTof = 0x60; + static const uint16_t kusSysIdT0 = 0x90; + + /// Control flags + Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice + Bool_t fbDebugMonitorMode = kFALSE; //! Switch ON the filling of a additional set of histograms + Bool_t fbIgnoreCriticalErrors = kTRUE; //! If ON not printout at all for critical errors + Bool_t fbComponentsAddedToList = kFALSE; + + /// User settings parameters + std::string fsChannelNameDataInput = "ts-request"; + std::string fsTsBlockName = "t0tofblock"; + std::string fsChannelNameHistosInput = "histogram-in"; + uint32_t fuHistoryHistoSize = 3600; + uint32_t fuMinTotPulser = 185; + uint32_t fuMaxTotPulser = 195; + int32_t fiGdpbIndex = -1; + uint32_t fuPublishFreqTs = 100; + double_t fdMinPublishTime = 0.5; + double_t fdMaxPublishTime = 5.0; + + /// Parameters management + TList* fParCList = nullptr; + + /// Statistics & first TS rejection + uint64_t fulNumMessages = 0; + uint64_t fulTsCounter = 0; + std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); + + /// Processing algo + CbmMcbm2018MonitorAlgoTof* fMonitorAlgo; + + /// Array of histograms to send to the histogram server + TObjArray fArrayHisto = {}; + /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server + std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {}; + /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server + /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" + /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" + std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; + /// Flag indicating whether the histograms and canvases configurations were already published + bool fbConfigSent = false; + + bool InitContainers(); + bool InitHistograms(); + Bool_t DoUnpack(const fles::Timeslice& ts, size_t component); + void Finish(); + bool SendHistoConfAndData(); + bool SendHistograms(); +}; + +// special class to expose protected TMessage constructor +class CbmMQTMessage : public TMessage { +public: + CbmMQTMessage(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); } +}; + + +#endif /* CBMDEVICEMONITORREQTOF_H_ */ diff --git a/MQ/monitor/runMonitorReqT0.cxx b/MQ/monitor/runMonitorReqT0.cxx new file mode 100644 index 0000000000000000000000000000000000000000..ba4068e147ceea7307b13b7ddf3b2ae69929b9a4 --- /dev/null +++ b/MQ/monitor/runMonitorReqT0.cxx @@ -0,0 +1,43 @@ +/** @file runMonitorReqT0.cxx + * @copyright Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#include "CbmDeviceMonitorReqT0.h" + +#include <iomanip> +#include <string> + +#include "runFairMQDevice.h" + +namespace bpo = boost::program_options; +using namespace std; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true"); + options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800), + "Size of evolution histos in seconds"); + options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut"); + options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut"); + options.add_options()("SpillThr", bpo::value<uint32_t>()->default_value(25), "Hits Nb Thr for spill detection"); + options.add_options()("SpillThrNonPuls", bpo::value<uint32_t>()->default_value(10), + "Non pulser Hits Nb Thr for spill detection"); + options.add_options()("SpillCheckInt", bpo::value<double>()->default_value(0.128), + "Interval in seconds between count checks for spill detection"); + options.add_options()("ChanMap", bpo::value<std::string>()->default_value("0,1,2,3,4,5,6,7"), + "Set T0 channel map e.g. 0,1,2,3,4,5,6,7"); + options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS"); + options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), + "Minimal time between two publishing"); + options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), + "Maximal time between two publishing"); + options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"), + "MQ channel name for TS data"); + options.add_options()("TsBlockName", bpo::value<std::string>()->default_value(""), + "Block name for requesting TS data, T0 SysId request if empty"); + options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), + "MQ channel name for histos"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorReqT0(); } diff --git a/MQ/monitor/runMonitorReqTof.cxx b/MQ/monitor/runMonitorReqTof.cxx new file mode 100644 index 0000000000000000000000000000000000000000..406df569df75a2469c8134944f16af72f6809804 --- /dev/null +++ b/MQ/monitor/runMonitorReqTof.cxx @@ -0,0 +1,40 @@ +/** @file runMonitorReqTof.cxx + * @copyright Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#include "CbmDeviceMonitorReqTof.h" + +#include <iomanip> +#include <string> + +#include "runFairMQDevice.h" + +namespace bpo = boost::program_options; +using namespace std; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true"); + options.add_options()("DebugMoni", bpo::value<bool>()->default_value(false), "Debug Monitor Mode"); + options.add_options()("IgnCritErr", bpo::value<bool>()->default_value(true), "Ignore Critical Errors"); + options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800), + "Size of evolution histos in seconds"); + options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut"); + options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut"); + options.add_options()("GdpbIdx", bpo::value<int32_t>()->default_value(-1), + "Single gDPB selection by index, -1 (default) to disable"); + options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS"); + options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), + "Minimal time between two publishing"); + options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), + "Maximal time between two publishing"); + options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"), + "MQ channel name for TS data"); + options.add_options()("TsBlockName", bpo::value<std::string>()->default_value(""), + "Block name for requesting TS data, TOF SysId request if empty"); + options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), + "MQ channel name for histos"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorReqTof(); } diff --git a/MQ/source/CMakeLists.txt b/MQ/source/CMakeLists.txt index 383431039eb48f1dea1ab12489034ef53b6a463b..9b7ee9f90283bebcab13769b5b85d4222412dbaa 100644 --- a/MQ/source/CMakeLists.txt +++ b/MQ/source/CMakeLists.txt @@ -97,6 +97,24 @@ set(DEPENDENCIES ) GENERATE_EXECUTABLE() +set(EXE_NAME RepReqTsSampler) +set(SRCS CbmMQTsSamplerRepReq.cxx runTsSamplerRepReq.cxx) +set(DEPENDENCIES + ${DEPENDENCIES} + ${FAIR_LIBS} + ${BOOST_LIBS} + fles_ipc + CbmMQBase + CbmFlibFlesTools + Core + Gpad + Hist + Net + RHTTP + RIO +) +GENERATE_EXECUTABLE() + set(EXE_NAME TsaSamplerTof) set(SRCS CbmMQTsaSamplerTof.cxx runTsaSamplerTof.cxx) @@ -154,3 +172,21 @@ set(DEPENDENCIES CbmMQBase ) GENERATE_EXECUTABLE() + +set(EXE_NAME TsConsumerReqExample) +set(SRCS CbmTsConsumerReqDevExample.cxx runTsConsumerReqExample.cxx) +set(DEPENDENCIES + ${DEPENDENCIES} + ${FAIR_LIBS} + ${BOOST_LIBS} + fles_ipc + CbmMQBase + CbmFlibFlesTools + Core + Gpad + Hist + Net + RHTTP + RIO +) +GENERATE_EXECUTABLE() diff --git a/MQ/source/CbmMQTsSamplerRepReq.cxx b/MQ/source/CbmMQTsSamplerRepReq.cxx new file mode 100644 index 0000000000000000000000000000000000000000..79f4e73d168ca9eb1b2e9e6d9fabd38070cd48bd --- /dev/null +++ b/MQ/source/CbmMQTsSamplerRepReq.cxx @@ -0,0 +1,960 @@ +/** @file CbmMQTsSamplerRepReq.cxx + * @copyright Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#include "CbmMQTsSamplerRepReq.h" + +#include "CbmFlesCanvasTools.h" +#include "CbmFormatDecHexPrintout.h" + +#include "TimesliceInputArchive.hpp" +#include "TimesliceMultiInputArchive.hpp" +#include "TimesliceMultiSubscriber.hpp" +#include "TimesliceSubscriber.hpp" + +#include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig + +#include <TCanvas.h> +#include <TH1F.h> +#include <TH1I.h> +#include <TProfile.h> + +#include "BoostSerializer.h" +#include <boost/algorithm/string.hpp> +#include <boost/archive/binary_oarchive.hpp> +//#include <boost/filesystem.hpp> +#include <boost/regex.hpp> +#include <boost/serialization/utility.hpp> + +#include "RootSerializer.h" + +//namespace filesys = boost::filesystem; + +#include <thread> // this_thread::sleep_for + +#include <algorithm> +#include <chrono> +#include <ctime> +#include <iomanip> +#include <sstream> +#include <string> + +#include <stdio.h> + +using namespace std; + +#include <stdexcept> + +struct InitTaskError : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +CbmMQTsSamplerRepReq::CbmMQTsSamplerRepReq() + : FairMQDevice() + , fTime() + , fLastPublishTime {std::chrono::system_clock::now()} +{ +} + +void CbmMQTsSamplerRepReq::InitTask() +try { + // Get the values from the command line options (via fConfig) + fsFileName = fConfig->GetValue<string>("filename"); + fsDirName = fConfig->GetValue<string>("dirname"); + fsHost = fConfig->GetValue<string>("fles-host"); + fusPort = fConfig->GetValue<uint16_t>("fles-port"); + fulHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark"); + fulMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices"); + fsChannelNameTsRequest = fConfig->GetValue<std::string>("ChNameTsReq"); + fbNoSplitTs = fConfig->GetValue<bool>("no-split-ts"); + fbSendTsPerSysId = fConfig->GetValue<bool>("send-ts-per-sysid"); + fbSendTsPerBlock = fConfig->GetValue<bool>("send-ts-per-block"); + fsChannelNameMissedTs = fConfig->GetValue<std::string>("ChNameMissTs"); + fsChannelNameCommands = fConfig->GetValue<std::string>("ChNameCmds"); + fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); + fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); + fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); + + if (fbNoSplitTs) { + if (fbSendTsPerSysId) { + if (fbSendTsPerBlock) { + LOG(warning) << "Both no-split-ts, send-ts-per-sysid and send-ts-per-block options used => " + << " second and third one will be ignored!!!!"; + } // if( fbSendTsPerBlock ) + else + LOG(warning) << "Both no-split-ts and send-ts-per-sysid options used => " + << " second one will be ignored!!!!"; + } // if( fbSendTsPerSysId ) + else if (fbSendTsPerBlock) { + LOG(warning) << "Both no-split-ts and send-ts-per-block options used => " + << " second one will be ignored!!!!"; + } // else if( fbSendTsPerSysId ) of if( fbSendTsPerSysId ) + else + LOG(debug) << "Running in no-split-ts mode!"; + } // if( fbNoSplitTs ) + else if (fbSendTsPerBlock) { + if (fbSendTsPerSysId) { + LOG(warning) << "Both send-ts-per-sysid and send-ts-per-block options used => " + << " second one will be ignored!!!!"; + } // if (fbSendTsPerSysId) + else + LOG(debug) << "Running in send-ts-per-block mode!"; + } // else if( fbSendTsPerBlock ) of if( fbNoSplitTs ) + else if (fbSendTsPerSysId) { + LOG(debug) << "Running in send-ts-per-sysid mode!"; + } // else if (fbSendTsPerSysId) else if( fbSendTsPerBlock ) of if( fbNoSplitTs ) + else { + LOG(debug) << "Running in no-split-ts mode by default!"; + fbNoSplitTs = true; + } // else of else if (fbSendTsPerSysId) else if( fbSendTsPerBlock ) of if( fbNoSplitTs ) + + /// Extract SysId and channel information if provided in the binary options + std::vector<std::string> vSysIdBlockPairs = fConfig->GetValue<std::vector<std::string>>("block-sysid"); + for (uint32_t uPair = 0; uPair < vSysIdBlockPairs.size(); ++uPair) { + const size_t sep = vSysIdBlockPairs[uPair].find(':'); + if (string::npos == sep || 0 == sep || vSysIdBlockPairs[uPair].size() == sep) { + LOG(info) << vSysIdBlockPairs[uPair]; + throw InitTaskError("Provided pair of Block name + SysId is missing a : or an argument."); + } // if( string::npos == sep || 0 == sep || vSysIdBlockPairs[ uPair ].size() == sep ) + + /// Extract Block name + std::string sBlockName = vSysIdBlockPairs[uPair].substr(0, sep); + + /// Extract SysId + /// TODO: or component name + std::string sSysId = vSysIdBlockPairs[uPair].substr(sep + 1); + const size_t hexPos = sSysId.find("0x"); + uint16_t usSysId; + if (string::npos == hexPos) usSysId = std::stoi(sSysId); + else + usSysId = std::stoi(sSysId.substr(hexPos + 2), nullptr, 16); + + LOG(debug) << "Extracted block info from pair \"" << vSysIdBlockPairs[uPair] << "\": name is " << sBlockName + << " and SysId is " << usSysId << " extracted from " << sSysId; + + /// Check if SysId already in use + uint32_t uSysIdIdx = 0; + for (; uSysIdIdx < fSysId.size() && fSysId[uSysIdIdx] != usSysId; ++uSysIdIdx) {} + if (uSysIdIdx == fSysId.size()) { throw InitTaskError("Unknown System ID for " + vSysIdBlockPairs[uPair]); } + else if (true == fComponentActive[uSysIdIdx]) { + throw InitTaskError("System ID already in use by another block for " + vSysIdBlockPairs[uPair]); + } + fComponentActive[uSysIdIdx] = true; + + /// Look if Block is already defined + auto itBlock = fvBlocksToSend.begin(); + for (; itBlock != fvBlocksToSend.end(); ++itBlock) { + if ((*itBlock).first == sBlockName) break; + } // for( ; itBlock != fvBlocksToSend.end(); ++itBlock) + if (fvBlocksToSend.end() != itBlock) { + /// Block already there, add the SysId to its list + (*itBlock).second.insert(usSysId); + } // if( fvBlocksToSend.end() != itBlock ) + else { + /// Block unknown yet, add both Block and First SysId + fvBlocksToSend.push_back(std::pair<std::string, std::set<uint16_t>>(sBlockName, {usSysId})); + fvvCompPerBlock.push_back(std::vector<uint32_t>({})); + } // else of if( fSysId.end() != pos ) + + LOG(info) << vSysIdBlockPairs[uPair] << " Added SysId 0x" << std::hex << usSysId << std::dec << " to " + << sBlockName; + } // for( uint32_t uPair = 0; uPair < vSysIdBlockPairs.size(); ++uPair ) + + if (0 == fulMaxTimeslices) fulMaxTimeslices = UINT_MAX; + + // Check which input is defined + // Possibilities: + // filename && ! dirname : single file + // filename with wildcards && dirname : all files with filename regex in the directory + // host && port : connect to the flesnet server + bool isGoodInputCombi {false}; + if (0 != fsFileName.size() && 0 == fsDirName.size() && 0 == fsHost.size() && 0 == fusPort) { + isGoodInputCombi = true; + fvsInputFileList.push_back(fsFileName); + } + else if (0 != fsFileName.size() && 0 != fsDirName.size() && 0 == fsHost.size() && 0 == fusPort) { + isGoodInputCombi = true; + fvsInputFileList.push_back(fsFileName); + } + else if (0 == fsFileName.size() && 0 == fsDirName.size() && 0 != fsHost.size() && 0 != fusPort) { + isGoodInputCombi = true; + LOG(info) << "Host: " << fsHost; + LOG(info) << "Port: " << fusPort; + } + else if (0 == fsFileName.size() && 0 == fsDirName.size() && 0 != fsHost.size() && 0 == fusPort) { + isGoodInputCombi = true; + LOG(info) << "Host string: " << fsHost; + } + else { + isGoodInputCombi = false; + } + + if (!isGoodInputCombi) { + throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory " + "or host + port are allowed combination."); + } + + LOG(info) << "MaxTimeslices: " << fulMaxTimeslices; + + if (0 == fsFileName.size() && 0 != fsHost.size() && 0 != fusPort) { + // Don't add the protocol since this is done now in the TimesliceMultiSubscriber + //std::string connector = "tcp://" + fsHost + ":" + std::to_string(fusPort); + std::string connector = fsHost + ":" + std::to_string(fusPort); + LOG(info) << "Open TSPublisher at " << connector; + fSource = new fles::TimesliceMultiSubscriber(connector); + } + else if (0 == fsFileName.size() && 0 != fsHost.size()) { + std::string connector = fsHost; + LOG(info) << "Open TSPublisher with host string: " << connector; + fSource = new fles::TimesliceMultiSubscriber(connector, fulHighWaterMark); + } + else { + // Create a ";" separated string with all file names + std::string fileList {""}; + for (const auto& obj : fvsInputFileList) { + std::string fileName = obj; + fileList += fileName; + fileList += ";"; + } + fileList.pop_back(); // Remove the last ; + LOG(info) << "Input File String: " << fileList; + fSource = new fles::TimesliceMultiInputArchive(fileList, fsDirName); + if (!fSource) { throw InitTaskError("Could open files from file list."); } + } + + LOG(info) << "High-Water Mark: " << fulHighWaterMark; + LOG(info) << "Max. Timeslices: " << fulMaxTimeslices; + if (fbNoSplitTs) { LOG(info) << "Sending TS copies in no-split mode"; } // if( fbNoSplitTs ) + else if (fbSendTsPerSysId) { + LOG(info) << "Sending components in separate TS per SysId"; + } // else if( fbSendTsPerSysId ) of if( fbNoSplitTs ) + else if (fbSendTsPerBlock) { + LOG(info) << "Sending components in separate TS per block (multiple SysId)"; + } // else if( fbSendTsPerBlock ) of if( fbSendTsPerSysId ) of if( fbNoSplitTs ) + + OnData(fsChannelNameTsRequest, &CbmMQTsSamplerRepReq::HandleRequest); + + fTime = std::chrono::steady_clock::now(); +} +catch (InitTaskError& e) { + LOG(error) << e.what(); + ChangeState(fair::mq::Transition::ErrorFound); +} +catch (boost::bad_any_cast& e) { + LOG(error) << "Error during InitTask: " << e.what(); + ChangeState(fair::mq::Transition::ErrorFound); +} + +bool CbmMQTsSamplerRepReq::InitHistograms() +{ + LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs; + LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime; + LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime; + + /// Vector of pointers on each histo (+ optionally desired folder) + std::vector<std::pair<TNamed*, std::string>> vHistos = {}; + /// Vector of pointers on each canvas (+ optionally desired folder) + std::vector<std::pair<TCanvas*, std::string>> vCanvases = {}; + + /// Histos creation and obtain pointer on them + fhTsRate = new TH1I("TsRate", "TS rate; t [s]", 1800, 0., 1800.); + fhTsSize = new TH1I("TsSize", "Size of TS; Size [MB]", 15000, 0., 15000.); + fhTsSizeEvo = new TProfile("TsSizeEvo", "Evolution of the TS Size; t [s]; Mean size [MB]", 1800, 0., 1800.); + fhTsMaxSizeEvo = new TH1F("TsMaxSizeEvo", "Evolution of maximal TS Size; t [s]; Max size [MB]", 1800, 0., 1800.); + fhMissedTS = new TH1I("Missed_TS", "Missed TS", 2, -0.5, 1.5); + fhMissedTSEvo = new TProfile("Missed_TS_Evo", "Missed TS evolution; t [s]", 1800, 0., 1800.); + + /// Add histo pointers to the histo vector + vHistos.push_back(std::pair<TNamed*, std::string>(fhTsRate, "Sampler")); + vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSize, "Sampler")); + vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSizeEvo, "Sampler")); + vHistos.push_back(std::pair<TNamed*, std::string>(fhTsMaxSizeEvo, "Sampler")); + vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTS, "Sampler")); + vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTSEvo, "Sampler")); + + /// Canvases creation + Double_t w = 10; + Double_t h = 10; + fcSummary = new TCanvas("cSampSummary", "Sampler monitoring plots", w, h); + fcSummary->Divide(2, 3); + + fcSummary->cd(1); + gPad->SetGridx(); + gPad->SetGridy(); + fhTsRate->Draw("hist"); + + fcSummary->cd(2); + gPad->SetGridx(); + gPad->SetGridy(); + gPad->SetLogx(); + gPad->SetLogy(); + fhTsSize->Draw("hist"); + + fcSummary->cd(3); + gPad->SetGridx(); + gPad->SetGridy(); + fhTsSizeEvo->Draw("hist"); + + fcSummary->cd(4); + gPad->SetGridx(); + gPad->SetGridy(); + fhTsMaxSizeEvo->Draw("hist"); + + fcSummary->cd(5); + gPad->SetGridx(); + gPad->SetGridy(); + fhMissedTS->Draw("hist"); + + fcSummary->cd(6); + gPad->SetGridx(); + gPad->SetGridy(); + fhMissedTSEvo->Draw("el"); + + /// Add canvas pointers to the canvas vector + vCanvases.push_back(std::pair<TCanvas*, std::string>(fcSummary, "canvases")); + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { + // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() + // << " in " << vHistos[ uHisto ].second.data() + // ; + fArrayHisto.Add(vHistos[uHisto].first); + std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); + fvpsHistosFolder.push_back(psHistoConfig); + + LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { + // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() + // << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[uCanv].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); + + std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); + + fvpsCanvasConfig.push_back(psCanvConfig); + + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + + return true; +} + +bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) +{ + /// Initialize the histograms + if (0 < fuPublishFreqTs && 0 == fulTsCounter) { InitHistograms(); } // if( 0 < fuPublishFreqTs ) + + if (fbNoSplitTs) { + + if (!CreateAndSendFullTs()) { + /// If command channel defined, send command to all "slaves" + if ("" != fsChannelNameCommands) { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + SendCommand("STOP"); + } // if( "" != fsChannelNameCommands ) + + return false; + } // if( !CreateAndSendFullTs( ts ) ) + } // if( fbNoSplitTs ) + else if (fbSendTsPerSysId) { + /// TODO: add support for alternative request with "system name" instead of "system ID" + std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize()); + int iSysId = std::stoi(reqStr); + LOG(debug) << "Received TS SysId component request from client: 0x" << std::hex << iSysId << std::dec; + + /// This assumes that the order of the components does NOT change after the first TS + /// That should be the case as the component index correspond to a physical link idx + if (!CreateCombinedComponentsPerSysId(iSysId)) { + /// If command channel defined, send command to all "slaves" + if ("" != fsChannelNameCommands) { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + SendCommand("STOP"); + } // if( "" != fsChannelNameCommands ) + + return false; + } // if(!CreateAndCombineComponentsPerSysId(iSysId) ) + } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs + else if (fbSendTsPerBlock) { + std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize()); + LOG(debug) << "Received TS components block request from client: " << reqStr; + + /// This assumes that the order of the components does NOT change after the first TS + /// That should be the case as the component index correspond to a physical link idx + if (!CreateCombinedComponentsPerBlock(reqStr)) { + /// If command channel defined, send command to all "slaves" + if ("" != fsChannelNameCommands) { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + SendCommand("STOP"); + } // if( "" != fsChannelNameCommands ) + + return false; + } // if( !CreateAndCombineComponentsPerChannel(reqStr) ) + } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs ) + + /// Send histograms each 100 time slices. Should be each ~1s + /// Use also runtime checker to trigger sending after M s if + /// processing too slow or delay sending if processing too fast + std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); + std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; + if ((fdMaxPublishTime < elapsedSeconds.count()) + || (0 == fulMessageCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { + if (!fbConfigSent) { + // Send the configuration only once per run! + fbConfigSent = SendHistoConfAndData(); + } // if( !fbConfigSent ) + else + SendHistograms(); + + fLastPublishTime = std::chrono::system_clock::now(); + } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulMessageCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) + + return true; +} + +std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs() +{ + /// Initialize the source (connect to emitter, ...) + if (0 == fulTsCounter && nullptr != dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)) { + dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)->InitTimesliceSubscriber(); + } // if( 0 == fulTsCounter && nullptr != dynamic_cast< fles::TimesliceMultiSubscriber >(fSource) ) + + std::unique_ptr<fles::Timeslice> timeslice = fSource->get(); + if (timeslice) { + if (fulTsCounter < fulMaxTimeslices) { + fulTsCounter++; + + const fles::Timeslice& ts = *timeslice; + uint64_t uTsIndex = ts.index(); + + if (0 < fuPublishFreqTs) { + uint64_t uTsTime = ts.descriptor(0, 0).idx; + if (0 == fuStartTime) { fuStartTime = uTsTime; } // if( 0 == fuStartTime ) + fdTimeToStart = static_cast<double_t>(uTsTime - fuStartTime) / 1e9; + uint64_t uSizeMb = 0; + + for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) { + uSizeMb += ts.size_component(uComp) / (1024 * 1024); + } // for( uint_t uComp = 0; uComp < ts.num_components(); ++uComp ) + + + fhTsRate->Fill(fdTimeToStart); + fhTsSize->Fill(uSizeMb); + fhTsSizeEvo->Fill(fdTimeToStart, uSizeMb); + + /// Fill max size per s (assumes the histo binning is 1 second!) + if (0. == fdLastMaxTime) { + fdLastMaxTime = fdTimeToStart; + fdTsMaxSize = uSizeMb; + } // if( 0. == fdLastMaxTime ) + else if (1. <= fdTimeToStart - fdLastMaxTime) { + fhTsMaxSizeEvo->Fill(fdLastMaxTime, fdTsMaxSize); + fdLastMaxTime = fdTimeToStart; + fdTsMaxSize = uSizeMb; + } // else if if( 1 <= fdTimeToStart - fdLastMaxTime ) + else if (fdTsMaxSize < uSizeMb) { + fdTsMaxSize = uSizeMb; + } // else if( fdTsMaxSize < uSizeMb ) + } // if( 0 < fuPublishFreqTs ) + + /// Missed TS detection (only if output channel name defined by user) + if ((uTsIndex != (fulPrevTsIndex + 1)) && (0 != fulPrevTsIndex && 0 != uTsIndex)) { + LOG(info) << "Missed Timeslices. Old TS Index was " << fulPrevTsIndex << " New TS Index is " << uTsIndex + << " diff is " << uTsIndex - fulPrevTsIndex << " Missing are " << uTsIndex - fulPrevTsIndex - 1; + + if ("" != fsChannelNameMissedTs) { + /// Add missing TS indices to a vector and send it in appropriate channel + std::vector<uint64_t> vulMissedIndices; + for (uint64_t ulMiss = fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) { + vulMissedIndices.emplace_back(ulMiss); + } // for( uint64_t ulMiss = fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss ) + if (!SendMissedTsIdx(vulMissedIndices)) { + /// If command channel defined, send command to all "slaves" + if ("" != fsChannelNameCommands) { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + SendCommand("STOP"); + } // if( "" != fsChannelNameCommands ) + + return nullptr; + } // if( !SendMissedTsIdx( vulMissedIndices ) ) + } // if( "" != fsChannelNameMissedTs ) + + if (0 < fuPublishFreqTs) { + fhMissedTS->Fill(1, uTsIndex - fulPrevTsIndex - 1); + fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fulPrevTsIndex - 1); + } // if( 0 < fuPublishFreqTs ) + + } // if( ( uTsIndex != ( fulPrevTsIndex + 1 ) ) && ( 0 != fulPrevTsIndex && 0 != uTsIndex ) ) + + if (0 < fuPublishFreqTs) { + fhMissedTS->Fill(0); + fhMissedTSEvo->Fill(fdTimeToStart, 0, 1); + } // else if( 0 < fuPublishFreqTs ) + + fulPrevTsIndex = uTsIndex; + + if (fulTsCounter % 10000 == 0) { LOG(info) << "Received TS " << fulTsCounter << " with index " << uTsIndex; } + + LOG(debug) << "Found " << ts.num_components() << " different components in timeslice"; + } // else of if (fulTsCounter < fulMaxTimeslices) + return timeslice; + } // if (timeslice) + else { + CalcRuntime(); + + /// If command channel defined, send command to all "slaves" + if ("" != fsChannelNameCommands) { + /// Wait 1 s before sending an EOF to let all slaves finish processing previous data + std::this_thread::sleep_for(std::chrono::seconds(10)); + std::string sCmd = "EOF "; + sCmd += FormatDecPrintout(fulPrevTsIndex); + sCmd += " "; + sCmd += FormatDecPrintout(fulTsCounter); + SendCommand(sCmd); + } // if( "" != fsChannelNameCommands ) + + return nullptr; + } // else of if (timeslice) +} + +bool CbmMQTsSamplerRepReq::AddNewTsInBuffer() +{ + /// Remove the first TS(s) in buffer if we reached the HighWater mark + while (fulHighWaterMark <= fdpTimesliceBuffer.size()) { + fdpTimesliceBuffer.pop_front(); + fdbCompSentFlags.pop_front(); + } // while( fulHighWaterMark <= fdpTimesliceBuffer.size() ) + + /// Add a new TS and "fail" if we did not get it + fdpTimesliceBuffer.push_back(GetNewTs()); + if (nullptr == fdpTimesliceBuffer.back()) { + fdpTimesliceBuffer.pop_back(); + return false; + } // if(nullptr == fdpTimesliceBuffer[fdpTimesliceBuffer.size() - 1]) + + /// Now that we got the TS, we can add the corresponding list of "Sent" flags, + /// with the proper dimension + if (fbSendTsPerBlock) { fdbCompSentFlags.push_back(std::vector<bool>(fvBlocksToSend.size(), false)); } + else { + fdbCompSentFlags.push_back(std::vector<bool>(fComponentActive.size(), false)); + } + return true; +} + +bool CbmMQTsSamplerRepReq::CreateAndSendFullTs() +{ + std::unique_ptr<fles::Timeslice> timeslice = GetNewTs(); + if (timeslice) { + /// Send full TS as response to the request + const fles::Timeslice& ts = *timeslice; + fles::StorableTimeslice fullTs {ts}; + if (!SendData(fullTs)) { + /// If command channel defined, send command to all "slaves" + if ("" != fsChannelNameCommands) { + /// Wait 1 s before sending a STOP to let all slaves finish processing previous data + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + SendCommand("STOP"); + } // if( "" != fsChannelNameCommands ) + + return false; + } // if (!SendData(fullTs, uChanIdx)) + return true; + } // if (timeslice) + else { + return false; + } // else of if (timeslice) +} + +bool CbmMQTsSamplerRepReq::PrepareCompListPerSysId() +{ + if (false == fbListCompPerSysIdReady) { + /// Check if already at least one TS in the buffer (should not be the case + /// => if not, add one + if (0 == fdpTimesliceBuffer.size()) { + if (!AddNewTsInBuffer()) return false; + } // if( 0 == fdpTimesliceBuffer.size() ) + + if (nullptr == fdpTimesliceBuffer.front()) return false; + + for (uint32_t uCompIdx = 0; uCompIdx < fdpTimesliceBuffer.front()->num_components(); ++uCompIdx) { + uint16_t usMsSysId = fdpTimesliceBuffer.front()->descriptor(uCompIdx, 0).sys_id; + + const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usMsSysId); + if (fSysId.end() != pos) { + const vector<std::string>::size_type idx = pos - fSysId.begin(); + + fvvCompPerSysId[idx].push_back(uCompIdx); + } // if( fSysId.end() != pos ) + } // for( uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx ) + + for (uint32_t uSysIdx = 0; uSysIdx < fComponents.size(); ++uSysIdx) { + std::stringstream ss; + ss << "Found " << std::setw(2) << fvvCompPerSysId[uSysIdx].size() << " components for SysId 0x" << std::hex + << std::setw(2) << fSysId[uSysIdx] << std::dec << " :"; + + for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) { + ss << " " << std::setw(3) << fvvCompPerSysId[uSysIdx][uComp]; + } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp ) + + LOG(info) << ss.str(); + } // for( uint32_t uSysId = 0; uSysId < fSysId.size(); ++uSysId ) + + fbListCompPerSysIdReady = true; + } // if( false == fbListCompPerSysIdReady ) + + return true; +} +bool CbmMQTsSamplerRepReq::CreateCombinedComponentsPerSysId(std::string sSystemName) +{ + /// Check if the requested System name is in the list of known components + /// 1) First build the list of components for each SysId if it was not already done + if (!PrepareCompListPerSysId()) return false; + + /// 2) Search for requested System name is in the list of known components, get its index and then send the TS + const vector<std::string>::const_iterator pos = std::find(fComponents.begin(), fComponents.end(), sSystemName); + if (fComponents.end() != pos) { + const vector<std::string>::size_type idx = pos - fComponents.begin(); + return CreateCombinedComponentsPerSysId(static_cast<uint32_t>(idx)); + } // if (fComponents.end() != pos) + else { + LOG(error) << "Did not find " << sSystemName << " in the list of known systems"; + return false; + } // else of if (fComponents.end() != pos) +} +bool CbmMQTsSamplerRepReq::CreateCombinedComponentsPerSysId(int iSysId) +{ + /// Check if the requested System ID is in the list of known components + /// 1) First build the list of components for each SysId if it was not already done + if (!PrepareCompListPerSysId()) return false; + + /// 2) Search for requested System ID is in the list of known components, get its index and then send the TS + const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId); + if (fSysId.end() != pos) { + const vector<int>::size_type idx = pos - fSysId.begin(); + return CreateCombinedComponentsPerSysId(static_cast<uint32_t>(idx)); + } // if (fSysId.end() != pos) + else { + LOG(error) << "Did not find 0x" << std::hex << iSysId << std::dec << " in the list of known systems"; + return false; + } // else of if (fSysId.end() != pos) +} +bool CbmMQTsSamplerRepReq::CreateCombinedComponentsPerSysId(uint uCompIndex) +{ + /// Then loop on all possible SysId and send TS with their respective components if needed + LOG(debug) << "Create timeslice with components for SysId " << std::hex << fSysId[uCompIndex] << std::dec; + + if (0 < fvvCompPerSysId[uCompIndex].size()) { + /// Search if TS in buffer where all components for this system where not sent yet + uint32_t uTsIndex = 0; + for (; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex) { + if (false == fdbCompSentFlags[uTsIndex][uCompIndex]) break; + } // for( ; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex ) + + /// If all TS in buffer have sent this one, get a new TS + if (fdpTimesliceBuffer.size() == uTsIndex) { + --uTsIndex; + if (!AddNewTsInBuffer()) return false; + } // if( fdpTimesliceBuffer.size() == uTsIndex ) + + /// Prepare the custom TS and send it + fles::StorableTimeslice component {static_cast<uint32_t>(fdpTimesliceBuffer[uTsIndex]->num_core_microslices()), + fdpTimesliceBuffer[uTsIndex]->index()}; + + for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uCompIndex].size(); ++uComp) { + uint32_t uNumMsInComp = fdpTimesliceBuffer[uTsIndex]->num_microslices(fvvCompPerSysId[uCompIndex][uComp]); + component.append_component(uNumMsInComp); + + LOG(debug) << "Add components to TS for SysId " << std::hex << fSysId[uCompIndex] << std::dec << " TS " + << fdpTimesliceBuffer[uTsIndex]->index() << " Comp " << fvvCompPerSysId[uCompIndex][uComp]; + + for (size_t m = 0; m < uNumMsInComp; ++m) { + component.append_microslice(uComp, m, + fdpTimesliceBuffer[uTsIndex]->descriptor(fvvCompPerSysId[uCompIndex][uComp], m), + fdpTimesliceBuffer[uTsIndex]->content(fvvCompPerSysId[uCompIndex][uComp], m)); + } // for( size_t m = 0; m < uNumMsInComp; ++m ) + } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uCompIndex ].size(); ++uComp ) + + LOG(debug) << "Prepared timeslice for SysId " << std::hex << fSysId[uCompIndex] << std::dec << " with " + << component.num_components() << " components"; + + if (!SendData(component)) return false; + + fdbCompSentFlags[uTsIndex][uCompIndex] = true; + } // if (0 < fvvCompPerSysId[uCompIndex].size()) + + return true; +} + +bool CbmMQTsSamplerRepReq::PrepareCompListPerBlock() +{ + if (false == fbListCompPerBlockReady) { + /// 1) First build the list of components for each SysId if it was not already done + if (!PrepareCompListPerSysId()) return false; + + /// 2) Build the list of components for each block, based on its list of system IDs + for (auto itBlock = fvBlocksToSend.begin(); itBlock != fvBlocksToSend.end(); ++itBlock) { + auto uBlockIdx = itBlock - fvBlocksToSend.begin(); + + for (auto itSys = (itBlock->second).begin(); itSys != (itBlock->second).end(); ++itSys) { + /// Check if this system ID is existing + const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), *itSys); + if (fSysId.end() != pos) { + const vector<int>::size_type idxSys = pos - fSysId.begin(); + + /// Add all components to the list + for (uint32_t uComp = 0; uComp < fvvCompPerSysId[idxSys].size(); ++uComp) { + fvvCompPerBlock[uBlockIdx].push_back(fvvCompPerSysId[idxSys][uComp]); + } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ idxSys ].size(); ++uComp ) + } // if (fSysId.end() != pos) + else { + LOG(error) << "Error when building the components list for block " << itBlock->first; + LOG(error) << "Did not find 0x" << std::hex << *itSys << std::dec << " in the list of known systems"; + return false; + } // else of if (fSysId.end() != pos) + } // for( auto itSys = (itBlock->second).begin(); itSys != (itBlock->second).end(); ++itSys ) + } // for( auto itBlock = fvBlocksToSend.begin(); itBlock != fvBlocksToSend.end(); ++itBlock) + + fbListCompPerBlockReady = true; + } // if( false == fbListCompPerBlockReady ) + + return true; +} + +bool CbmMQTsSamplerRepReq::CreateCombinedComponentsPerBlock(std::string sBlockName) +{ + /// Check if the requested Block is in the list of known blocks + /// 1) First build the list of components for each block if it was not already done + if (!PrepareCompListPerBlock()) return false; + + /// 2) Search for requested block is in the list of known blocks, get its index and then send the TS + for (auto itKnownBlock = fvBlocksToSend.begin(); itKnownBlock != fvBlocksToSend.end(); ++itKnownBlock) { + if ((*itKnownBlock).first == sBlockName) { + auto uBlockIdx = itKnownBlock - fvBlocksToSend.begin(); + + /// Search if TS in buffer where all components for this system where not sent yet + uint32_t uTsIndex = 0; + for (; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex) { + if (false == fdbCompSentFlags[uTsIndex][uBlockIdx]) break; + } // for( ; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex ) + + /// If all TS in buffer have sent this one, get a new TS + if (fdpTimesliceBuffer.size() == uTsIndex) { + --uTsIndex; + if (!AddNewTsInBuffer()) return false; + } // if( fdpTimesliceBuffer.size() == uTsIndex ) + + /// Prepare the custom TS and send it + fles::StorableTimeslice component {static_cast<uint32_t>(fdpTimesliceBuffer[uTsIndex]->num_core_microslices()), + fdpTimesliceBuffer[uTsIndex]->index()}; + + for (uint32_t uComp = 0; uComp < fvvCompPerBlock[uBlockIdx].size(); ++uComp) { + uint32_t uNumMsInComp = fdpTimesliceBuffer[uTsIndex]->num_microslices(fvvCompPerBlock[uBlockIdx][uComp]); + component.append_component(uNumMsInComp); + + LOG(debug) << "Add components to TS for Block " << sBlockName << " TS " << fdpTimesliceBuffer[uTsIndex]->index() + << " Comp " << fvvCompPerBlock[uBlockIdx][uComp]; + + for (size_t m = 0; m < uNumMsInComp; ++m) { + component.append_microslice(uComp, m, + fdpTimesliceBuffer[uTsIndex]->descriptor(fvvCompPerBlock[uBlockIdx][uComp], m), + fdpTimesliceBuffer[uTsIndex]->content(fvvCompPerBlock[uBlockIdx][uComp], m)); + } // for( size_t m = 0; m < uNumMsInComp; ++m ) + } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uCompIndex ].size(); ++uComp ) + + LOG(debug) << "Prepared timeslice for Block " << sBlockName << " with " << component.num_components() + << " components"; + + if (!SendData(component)) return false; + + fdbCompSentFlags[uTsIndex][uBlockIdx] = true; + return true; + } // if( (*itKnownBlock).first == sBlockName ) + } // for( auto itKnownBlock = fvBlocksToSend.begin(); itKnownBlock != fvBlocksToSend.end(); ++itKnownBlock) + + /// Should reach here only if the block name was not found in the list! + LOG(error) << "Requested block " << sBlockName << " not found in the list of known blocks"; + return false; +} + +bool CbmMQTsSamplerRepReq::SendData(const fles::StorableTimeslice& component) +{ + // serialize the timeslice and create the message + std::stringstream oss; + boost::archive::binary_oarchive oa(oss); + oa << component; + std::string* strMsg = new std::string(oss.str()); + + FairMQMessagePtr msg(NewMessage( + const_cast<char*>(strMsg->c_str()), // data + strMsg->length(), // size + [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, + strMsg)); // object that manages the data + + // in case of error or transfer interruption, + // return false to go to IDLE state + // successfull transfer will return number of bytes + // transfered (can be 0 if sending an empty message). + if (Send(msg, fsChannelNameTsRequest) < 0) { + LOG(error) << "Problem sending data"; + return false; + } + + fulMessageCounter++; + LOG(debug) << "Send message " << fulMessageCounter << " with a size of " << msg->GetSize(); + + return true; +} +bool CbmMQTsSamplerRepReq::SendMissedTsIdx(std::vector<uint64_t> vIndices) +{ + // serialize the vector and create the message + std::stringstream oss; + boost::archive::binary_oarchive oa(oss); + oa << vIndices; + std::string* strMsg = new std::string(oss.str()); + + FairMQMessagePtr msg(NewMessage( + const_cast<char*>(strMsg->c_str()), // data + strMsg->length(), // size + [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, + strMsg)); // object that manages the data + + // in case of error or transfer interruption, + // return false to go to IDLE state + // successfull transfer will return number of bytes + // transfered (can be 0 if sending an empty message). + LOG(debug) << "Send data to channel " << fsChannelNameMissedTs; + if (Send(msg, fsChannelNameMissedTs) < 0) { + LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameMissedTs; + return false; + } // if( Send( msg, fsChannelNameMissedTs ) < 0 ) + + return true; +} +bool CbmMQTsSamplerRepReq::SendCommand(std::string sCommand) +{ + // serialize the vector and create the message + std::stringstream oss; + boost::archive::binary_oarchive oa(oss); + oa << sCommand; + std::string* strMsg = new std::string(oss.str()); + + FairMQMessagePtr msg(NewMessage( + const_cast<char*>(strMsg->c_str()), // data + strMsg->length(), // size + [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, + strMsg)); // object that manages the data + + // FairMQMessagePtr msg( NewMessage( const_cast<char*>( sCommand.c_str() ), // data + // sCommand.length(), // size + // []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); }, + // &sCommand ) ); // object that manages the data + + // in case of error or transfer interruption, + // return false to go to IDLE state + // successfull transfer will return number of bytes + // transfered (can be 0 if sending an empty message). + LOG(debug) << "Send data to channel " << fsChannelNameCommands; + if (Send(msg, fsChannelNameCommands) < 0) { + LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameCommands; + return false; + } // if( Send( msg, fsChannelNameMissedTs ) < 0 ) + + return true; +} +bool CbmMQTsSamplerRepReq::SendHistoConfAndData() +{ + /// Prepare multiparts message and header + std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size()); + FairMQMessagePtr messageHeader(NewMessage()); + Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader); + + FairMQParts partsOut; + partsOut.AddPart(std::move(messageHeader)); + + for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) { + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]); + + partsOut.AddPart(std::move(messageHist)); + } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]); + + partsOut.AddPart(std::move(messageCan)); + } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr msgHistos(NewMessage()); + Serialize<RootSerializer>(*msgHistos, &fArrayHisto); + + partsOut.AddPart(std::move(msgHistos)); + + /// Send the multi-parts message to the common histogram messages queue + if (Send(partsOut, fsChannelNameHistosInput) < 0) { + LOG(error) << "CbmMQTsSamplerRepReq::SendHistoConfAndData => Problem sending data"; + return false; + } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + ResetHistograms(); + + return true; +} + +bool CbmMQTsSamplerRepReq::SendHistograms() +{ + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr message(NewMessage()); + Serialize<RootSerializer>(*message, &fArrayHisto); + + /// Send message to the common histogram messages queue + if (Send(message, fsChannelNameHistosInput) < 0) { + LOG(error) << "Problem sending data"; + return false; + } // if( Send( message, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + ResetHistograms(); + + return true; +} + +bool CbmMQTsSamplerRepReq::ResetHistograms() +{ + fhTsRate->Reset(); + fhTsSize->Reset(); + fhTsSizeEvo->Reset(); + fhTsMaxSizeEvo->Reset(); + fhMissedTS->Reset(); + fhMissedTSEvo->Reset(); + + return true; +} + +CbmMQTsSamplerRepReq::~CbmMQTsSamplerRepReq() {} + +void CbmMQTsSamplerRepReq::CalcRuntime() +{ + std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime; + + LOG(info) << "Runtime: " << run_time.count(); + LOG(info) << "No more input data"; +} diff --git a/MQ/source/CbmMQTsSamplerRepReq.h b/MQ/source/CbmMQTsSamplerRepReq.h new file mode 100644 index 0000000000000000000000000000000000000000..cf875e14239417de9d3d119d72081b23364db17b --- /dev/null +++ b/MQ/source/CbmMQTsSamplerRepReq.h @@ -0,0 +1,151 @@ +/** @file CbmMQTsSamplerRepReq.h + * @copyright Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +/******** + * TODO: + * Remove mode "Full TS spreading to multiple outputs" + * Keep track of components sent in split TS mode + * HW mark when sending independent components + * Use exceptions + try/catch instead of boolean return values + ********/ + +#ifndef CBMMQTSSAMPLERREPREQ_H_ +#define CBMMQTSSAMPLERREPREQ_H_ + + +#include "MicrosliceDescriptor.hpp" +#include "StorableTimeslice.hpp" +#include "Timeslice.hpp" +#include "TimesliceSource.hpp" + +#include "FairMQDevice.h" + +class TCanvas; +class TH1F; +class TH1I; +class TProfile; +#include <TObjArray.h> + +#include <ctime> +#include <deque> +#include <string> +#include <utility> +#include <vector> + +class CbmMQTsSamplerRepReq : public FairMQDevice { +public: + CbmMQTsSamplerRepReq(); + virtual ~CbmMQTsSamplerRepReq(); + +protected: + uint64_t fulMaxTimeslices; + + std::string fsFileName = ""; + std::string fsDirName = ""; + + std::vector<std::string> fvsInputFileList = {}; ///< List of input files + std::string fsHost = ""; + uint16_t fusPort = 0; + uint64_t fulHighWaterMark = 10; + + std::string fsChannelNameTsRequest = "ts-request"; + bool fbNoSplitTs = true; + bool fbSendTsPerSysId = false; + bool fbSendTsPerBlock = false; + + std::string fsChannelNameHistosInput = "histogram-in"; + uint32_t fuPublishFreqTs = 0; + double_t fdMinPublishTime = 0.5; + double_t fdMaxPublishTime = 5; + + uint64_t fulPrevTsIndex = 0; + uint64_t fulTsCounter = 0; + uint64_t fulMessageCounter = 0; + + virtual void InitTask(); + bool HandleRequest(FairMQMessagePtr&, int); + +private: + void CalcRuntime(); + bool IsChannelNameAllowed(std::string); + + std::unique_ptr<fles::Timeslice> GetNewTs(); + bool AddNewTsInBuffer(); + bool CreateAndSendFullTs(); + bool PrepareCompListPerSysId(); + bool CreateCombinedComponentsPerSysId(std::string sSystemName); + bool CreateCombinedComponentsPerSysId(int iSysId); + bool CreateCombinedComponentsPerSysId(uint uCompIndex); + bool PrepareCompListPerBlock(); + bool CreateCombinedComponentsPerBlock(std::string sBlockName); + + bool SendData(const fles::StorableTimeslice& component); + bool SendMissedTsIdx(std::vector<uint64_t> vIndices); + bool SendCommand(std::string sCommand); + + bool InitHistograms(); + bool SendHistoConfAndData(); + bool SendHistograms(); + bool ResetHistograms(); + + fles::TimesliceSource* fSource = nullptr; //! + std::chrono::steady_clock::time_point fTime; + std::chrono::system_clock::time_point fLastPublishTime; + + + // The vector fAllowedChannels contain the list of defined components names + // which are used for connecting the different devices. A request + // using the name stscomponent will receive timeslices containing the + // sts component only. The corresponding system ids are defined in the + // vector fSysId. + // The Blocks are defined by the user by combining a name with a list of components, + // either by name or by SysId + // A components can only be added to one block, attempts to double book will throw + // an init error + std::vector<std::string> fComponents = {"mvdcomponent", "stscomponent", "richcomponent", "muchcomponent", + "trdcomponent", "tofcomponent", "psdcomponent", "t0component"}; + std::vector<int> fSysId = {0x20, 0x10, 0x30, 0x50, 0x40, 0x60, 0x80, 0x90}; + std::vector<bool> fComponentActive = {false, false, false, false, false, false, false, false}; + + bool fbListCompPerSysIdReady = false; + std::vector<std::vector<uint32_t>> fvvCompPerSysId = {{}, {}, {}, {}, {}, {}, {}, {}}; + + bool fbListCompPerBlockReady = false; + std::vector<std::pair<std::string, std::set<uint16_t>>> fvBlocksToSend = {}; + std::vector<std::vector<uint32_t>> fvvCompPerBlock = {}; + + /// Buffering of partially sent timeslices, limited by fulHighWaterMark + std::deque<std::unique_ptr<fles::Timeslice>> fdpTimesliceBuffer = {}; + std::deque<std::vector<bool>> fdbCompSentFlags = {}; + + std::string fsChannelNameMissedTs = ""; + std::string fsChannelNameCommands = ""; + + /// Array of histograms to send to the histogram server + TObjArray fArrayHisto = {}; + /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server + std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {}; + /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server + /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" + /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" + std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; + /// Flag indicating whether the histograms and canvases configurations were already published + bool fbConfigSent = false; + + /// Histograms + TH1I* fhTsRate = nullptr; + TH1I* fhTsSize = nullptr; + TProfile* fhTsSizeEvo = nullptr; + TH1F* fhTsMaxSizeEvo = nullptr; + TH1I* fhMissedTS = nullptr; + TProfile* fhMissedTSEvo = nullptr; + TCanvas* fcSummary = nullptr; + uint64_t fuStartTime = 0; + double_t fdTimeToStart = 0.; + double_t fdLastMaxTime = 0.; + double_t fdTsMaxSize = 0.; +}; + +#endif /* CBMMQTSASAMPLER_H_ */ diff --git a/MQ/source/CbmMQTsaMultiSampler.h b/MQ/source/CbmMQTsaMultiSampler.h index edac2d47917672af060eb62b40989c57d03753c6..ca71268948cbd8db07a1658402edcda4d8f1b213 100644 --- a/MQ/source/CbmMQTsaMultiSampler.h +++ b/MQ/source/CbmMQTsaMultiSampler.h @@ -72,7 +72,6 @@ private: bool InitHistograms(); bool CheckTimeslice(const fles::Timeslice& ts); void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc); - bool SendData(const fles::StorableTimeslice& component); void CalcRuntime(); bool IsChannelNameAllowed(std::string); bool CreateAndSendComponent(const fles::Timeslice&, int); diff --git a/MQ/source/CbmTsConsumerReqDevExample.cxx b/MQ/source/CbmTsConsumerReqDevExample.cxx new file mode 100644 index 0000000000000000000000000000000000000000..0c3cad94a6b79dec0cd3e2ea0b6b0555403b29d8 --- /dev/null +++ b/MQ/source/CbmTsConsumerReqDevExample.cxx @@ -0,0 +1,334 @@ +/** @file CbmTsConsumerReqDevExample.cxx + * @copyright Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#include "CbmTsConsumerReqDevExample.h" + +#include "CbmFlesCanvasTools.h" + +#include "StorableTimeslice.hpp" + +#include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig +#include "FairParGenericSet.h" + +#include "TCanvas.h" +#include "TFile.h" +#include "TH1.h" +#include "TList.h" +#include "TNamed.h" +#include <thread> + +#include "BoostSerializer.h" +#include <boost/archive/binary_iarchive.hpp> +#include <boost/serialization/utility.hpp> + +#include <array> +#include <iomanip> +#include <stdexcept> +#include <string> + +#include "RootSerializer.h" +struct InitTaskError : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +using namespace std; + + +CbmTsConsumerReqDevExample::CbmTsConsumerReqDevExample() +// ALGO: : fMonitorAlgo {new CbmMcbm2018MonitorAlgoT0()} +{ +} + +void CbmTsConsumerReqDevExample::InitTask() +try { + /// Read options from executable + LOG(info) << "Init options for CbmMqStarHistoServer."; + fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs"); + fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); + fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); + fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); + fsTsBlockName = fConfig->GetValue<std::string>("TsBlockName"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); + + LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs; + LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime; + LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime; +} +catch (InitTaskError& e) { + LOG(error) << e.what(); + ChangeState(fair::mq::Transition::ErrorFound); +} + +bool CbmTsConsumerReqDevExample::InitContainers() +{ + LOG(info) << "Init parameter containers for CbmTsConsumerReqDevExample."; + + // ALGO: fParCList = fMonitorAlgo->GetParList(); + fParCList = new TList(); + + for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) { + FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC)); + fParCList->Remove(tempObj); + std::string paramName {tempObj->GetName()}; + // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). + // Should only be used for small data because of the cost of an additional copy + + // Her must come the proper Runid + std::string message = paramName + ",111"; + LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message; + + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + FairParGenericSet* newObj = nullptr; + + if (Send(req, "parameters") > 0) { + if (Receive(rep, "parameters") >= 0) { + if (rep->GetSize() != 0) { + CbmMQTMessage tmsg(rep->GetData(), rep->GetSize()); + newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass())); + LOG(info) << "Received unpack parameter from the server:"; + newObj->print(); + } + else { + LOG(error) << "Received empty reply. Parameter not available"; + } // if (rep->GetSize() != 0) + } // if (Receive(rep, "parameters") >= 0) + } // if (Send(req, "parameters") > 0) + fParCList->AddAt(newObj, iparC); + delete tempObj; + } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) + + /// Apply options to the processing algo + // ALGO: fMonitorAlgo->SetIgnoreOverlapMs(fbIgnoreOverlapMs); + + // fMonitorAlgo->AddMsComponentToList(0, 0x90); + + // ALGO: Bool_t initOK = fMonitorAlgo->InitContainers(); + bool initOK = true; + + return initOK; +} + +bool CbmTsConsumerReqDevExample::InitHistograms() +{ + /// Histos creation and obtain pointer on them + /// Trigger histo creation on all associated algos + // ALGO: bool initOK = fMonitorAlgo->CreateHistograms(); + bool initOK = true; + + /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) + // ALGO: std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector(); + std::vector<std::pair<TNamed*, std::string>> vHistos = {}; + /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) + // ALGO: std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector(); + std::vector<std::pair<TCanvas*, std::string>> vCanvases = {}; + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { + // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() + // << " in " << vHistos[ uHisto ].second.data() + // ; + fArrayHisto.Add(vHistos[uHisto].first); + std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); + fvpsHistosFolder.push_back(psHistoConfig); + + LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { + // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() + // << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[uCanv].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); + + std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); + + fvpsCanvasConfig.push_back(psCanvConfig); + + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + + return initOK; +} + + +bool CbmTsConsumerReqDevExample::ConditionalRun() +{ + /// First request a new TS (full or single system components or multi-syst components block) + std::string message = fsTsBlockName; + if ("" == message) message = std::to_string(kusSysId); + LOG(debug) << "Requesting new TS by sending message: " << message; + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + if (Send(req, fsChannelNameDataInput) <= 0) { + LOG(error) << "Failed to send the request! message was " << message; + return false; + } // if (Send(req, fsChannelNameDataInput) <= 0) + else if (Receive(rep, fsChannelNameDataInput) < 0) { + LOG(error) << "Failed to receive a reply to the request! message was " << message; + return false; + } // else if (Receive(rep, fsChannelNameDataInput) < 0) + else if (rep->GetSize() == 0) { + LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message; + return false; + } // else if (rep->GetSize() == 0) + + /// Message received, do Algo related Initialization steps if needed + if (0 == fulNumMessages) { + try { + InitContainers(); + } + catch (InitTaskError& e) { + LOG(error) << e.what(); + ChangeState(fair::mq::Transition::ErrorFound); + } + } // if( 0 == fulNumMessages) + + if (0 == fulNumMessages) InitHistograms(); + + fulNumMessages++; + LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize(); + + if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages"; + + std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize()); + std::istringstream iss(msgStr); + boost::archive::binary_iarchive inputArchive(iss); + + /// Create an empty TS and fill it with the incoming message + fles::StorableTimeslice component {0}; + inputArchive >> component; + + /// Process the Timeslice + DoUnpack(component, 0); + + /// Send histograms each 100 time slices. Should be each ~1s + /// Use also runtime checker to trigger sending after M s if + /// processing too slow or delay sending if processing too fast + std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); + std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; + if ((fdMaxPublishTime < elapsedSeconds.count()) + || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { + if (!fbConfigSent) { + // Send the configuration only once per run! + fbConfigSent = SendHistoConfAndData(); + } // if( !fbConfigSent ) + else + SendHistograms(); + + fLastPublishTime = std::chrono::system_clock::now(); + } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) + + return true; +} + +bool CbmTsConsumerReqDevExample::SendHistoConfAndData() +{ + /// Prepare multiparts message and header + std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size()); + FairMQMessagePtr messageHeader(NewMessage()); + Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader); + + FairMQParts partsOut; + partsOut.AddPart(std::move(messageHeader)); + + for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) { + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]); + + partsOut.AddPart(std::move(messageHist)); + } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan(NewMessage()); + Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]); + + partsOut.AddPart(std::move(messageCan)); + } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr msgHistos(NewMessage()); + Serialize<RootSerializer>(*msgHistos, &fArrayHisto); + + partsOut.AddPart(std::move(msgHistos)); + + /// Send the multi-parts message to the common histogram messages queue + if (Send(partsOut, fsChannelNameHistosInput) < 0) { + LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data"; + return false; + } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + // ALGO: fMonitorAlgo->ResetHistograms(kFALSE); + + return true; +} + +bool CbmTsConsumerReqDevExample::SendHistograms() +{ + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr message(NewMessage()); + Serialize<RootSerializer>(*message, &fArrayHisto); + + /// Send message to the common histogram messages queue + if (Send(message, fsChannelNameHistosInput) < 0) { + LOG(error) << "Problem sending data"; + return false; + } // if( Send( message, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + // ALGO: fMonitorAlgo->ResetHistograms(kFALSE); + + return true; +} + + +CbmTsConsumerReqDevExample::~CbmTsConsumerReqDevExample() {} + + +Bool_t CbmTsConsumerReqDevExample::DoUnpack(const fles::Timeslice& ts, size_t /*component*/) +{ + fulTsCounter++; + + if (kFALSE == fbComponentsAddedToList) { + for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) { + if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) { + /// Do something here + // ALGO: + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id ) + } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp ) + fbComponentsAddedToList = kTRUE; + } // if( kFALSE == fbComponentsAddedToList ) + + // ALGO: + /* + if (kFALSE == fMonitorAlgo->ProcessTs(ts)) { + LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class"; + return kTRUE; + } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) ) + */ + + /// Clear the digis vector in case it was filled + // ALGO: fMonitorAlgo->ClearVector(); + + if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices"; + + return kTRUE; +} + +void CbmTsConsumerReqDevExample::Finish() {} diff --git a/MQ/source/CbmTsConsumerReqDevExample.h b/MQ/source/CbmTsConsumerReqDevExample.h new file mode 100644 index 0000000000000000000000000000000000000000..2ef12cc3c3ac50df2fc599cb46e3acaf63084b0e --- /dev/null +++ b/MQ/source/CbmTsConsumerReqDevExample.h @@ -0,0 +1,85 @@ +/** @file CbmTsConsumerReqDevExample.h + * @copyright Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#ifndef CBMTSCONSUMERREQDEVEXPL_H_ +#define CBMTSCONSUMERREQDEVEXPL_H_ + +#include "Timeslice.hpp" + +#include "FairMQDevice.h" + +#include "Rtypes.h" +#include "TMessage.h" +#include "TObjArray.h" + +#include <chrono> +#include <map> +#include <vector> + +class TList; + +class CbmTsConsumerReqDevExample : public FairMQDevice { +public: + CbmTsConsumerReqDevExample(); + virtual ~CbmTsConsumerReqDevExample(); + +protected: + virtual void InitTask(); + virtual bool ConditionalRun(); + +private: + /// Constants + static const uint16_t kusSysId = 0xFF; + + /// Control flags + Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice + Bool_t fbComponentsAddedToList = kFALSE; + + /// User settings parameters + std::string fsChannelNameDataInput = "ts-request"; + std::string fsTsBlockName = "exampleblock"; + std::string fsChannelNameHistosInput = "histogram-in"; + uint32_t fuPublishFreqTs = 100; + double_t fdMinPublishTime = 0.5; + double_t fdMaxPublishTime = 5.0; + + /// Parameters management + TList* fParCList = nullptr; + + /// Statistics & first TS rejection + uint64_t fulNumMessages = 0; + uint64_t fulTsCounter = 0; + std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); + + /// Processing algo + // ALGO: CbmMcbm2018MonitorAlgoT0* fMonitorAlgo; + + /// Array of histograms to send to the histogram server + TObjArray fArrayHisto = {}; + /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server + std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {}; + /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server + /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" + /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" + std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; + /// Flag indicating whether the histograms and canvases configurations were already published + bool fbConfigSent = false; + + bool InitContainers(); + bool InitHistograms(); + bool DoUnpack(const fles::Timeslice& ts, size_t component); + void Finish(); + bool SendHistoConfAndData(); + bool SendHistograms(); +}; + +// special class to expose protected TMessage constructor +class CbmMQTMessage : public TMessage { +public: + CbmMQTMessage(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); } +}; + + +#endif /* CBMTSCONSUMERREQDEVEXPL_H_ */ diff --git a/MQ/source/runTsConsumerReqExample.cxx b/MQ/source/runTsConsumerReqExample.cxx new file mode 100644 index 0000000000000000000000000000000000000000..9f5b5f3eb7ae3656ed2fff21269cf8820ede214f --- /dev/null +++ b/MQ/source/runTsConsumerReqExample.cxx @@ -0,0 +1,32 @@ +/** @file runTsConsumerReqExample.cxx + * @copyright Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#include "CbmTsConsumerReqDevExample.h" + +#include <iomanip> +#include <string> + +#include "runFairMQDevice.h" + +namespace bpo = boost::program_options; +using namespace std; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true"); + options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS"); + options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), + "Minimal time between two publishing"); + options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), + "Maximal time between two publishing"); + options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"), + "MQ channel name for TS data"); + options.add_options()("TsBlockName", bpo::value<std::string>()->default_value("exampleblock"), + "Block name for requesting TS data, TOF SysId request if empty"); + options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), + "MQ channel name for histos"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmTsConsumerReqDevExample(); } diff --git a/MQ/source/runTsSamplerRepReq.cxx b/MQ/source/runTsSamplerRepReq.cxx new file mode 100644 index 0000000000000000000000000000000000000000..8e18009d09f2044d788d4ab996f44bd3bf455a66 --- /dev/null +++ b/MQ/source/runTsSamplerRepReq.cxx @@ -0,0 +1,51 @@ +/** @file runTsSamplerRepReq.cxx + * @copyright Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt + * @license SPDX-License-Identifier: GPL-3.0-only + * @authors Pierre-Alain Loizeau [orginator] **/ + +#include "CbmMQTsSamplerRepReq.h" + +#include "runFairMQDevice.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options()("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file"); + options.add_options()("dirname", bpo::value<std::string>()->default_value(""), + "Directory name where to find the input files"); + options.add_options()("fles-host", bpo::value<std::string>()->default_value(""), + "Host where the timeslice server is running"); + options.add_options()("fles-port", bpo::value<uint16_t>()->default_value(0), + "Port where the timeslice server is running"); + + options.add_options()("max-timeslices", bpo::value<uint64_t>()->default_value(0), + "Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 - infinite)"); + options.add_options()("high-water-mark", bpo::value<uint64_t>()->default_value(1), "High water mark for ZeroMQ"); + + options.add_options()("ChNameTsReq", bpo::value<std::string>()->default_value("ts-request"), + "MQ channel name for TS requests"); + options.add_options()("no-split-ts", bpo::value<bool>()->default_value(0), + "Send a copy of the full TS to single consummer"); + options.add_options()("send-ts-per-sysid", bpo::value<bool>()->default_value(0), + "Send a single TS upon request of a SysId with all matching components"); + options.add_options()("send-ts-per-block", bpo::value<bool>()->default_value(0), + "Send a single TS upon request of a block name with all matching components"); + options.add_options()("block-sysid", bpo::value<std::vector<std::string>>(), + "Pair a block name and SysId in hex, separated by :, unique use of SysId for all blocks!"); + + options.add_options()("ChNameMissTs", bpo::value<std::string>()->default_value(""), + "MQ channel name for missed TS indices"); + options.add_options()("ChNameCmds", bpo::value<std::string>()->default_value(""), + "MQ channel name for commands to slaves"); + + options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(0), "Histo publishing frequency in TS"); + options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), + "Minimal time between two publishing"); + options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), + "Maximal time between two publishing"); + options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), + "MQ channel name for histos"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmMQTsSamplerRepReq(); }