diff --git a/MQ/mcbm/CMakeLists.txt b/MQ/mcbm/CMakeLists.txt index 98b51503f56525b0c12cdea75ca1f88ea6d2542d..5ae68bb7b2c536ebcc368a197a839f7854a73fb6 100644 --- a/MQ/mcbm/CMakeLists.txt +++ b/MQ/mcbm/CMakeLists.txt @@ -1,9 +1,10 @@ -configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQMcbmPulserMonitor2020.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQMcbmPulserMonitor2020.sh) -configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQMcbmEvtBuilderWin2020.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQMcbmEvtBuilderWin2020.sh) -configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQBuildRawEvents.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQBuildRawEvents.sh) -configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEvents2021.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEvents2021.sh) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQMcbmPulserMonitor2020.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQMcbmPulserMonitor2020.sh) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQMcbmEvtBuilderWin2020.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQMcbmEvtBuilderWin2020.sh) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQBuildRawEvents.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQBuildRawEvents.sh) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEvents2021.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEvents2021.sh) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEventsCosmics2021.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEventsCosmics2021.sh) -configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEvents2022.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEvents2022.sh) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEvents2022.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEvents2022.sh) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBmonMoni2022.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBmonMoni2022.sh) set(INCLUDE_DIRECTORIES ${CMAKE_CURRENT_SOURCE_DIR} @@ -273,6 +274,35 @@ set(DEPENDENCIES ) GENERATE_EXECUTABLE() + +set(EXE_NAME BmonMonitor) +set(SRCS CbmDeviceBmonMonitor.cxx runBmonMonitor.cxx) + +set(DEPENDENCIES + ${DEPENDENCIES} + ${FAIR_LIBS} + ParBase + ${BOOST_LIBS} + external::fles_ipc + CbmFlibFlesTools + CbmBase + CbmRecoBase + CbmMuchReco + CbmPsdReco + CbmRichReco + CbmRecoSts + CbmTofReco + CbmTrdReco + CbmData + CbmSimSteer # for CbmSetup! + Core + RIO + Net + Hist + RHTTP +) +GENERATE_EXECUTABLE() + # Set the correct variables for the installation set(VMCWORKDIR ${CMAKE_INSTALL_PREFIX}/share/cbmroot) @@ -288,6 +318,7 @@ configure_file(${MY_SOURCE_DIR}/startMQMcbmEvtBuilderWin2020.sh.in ${TMPDIR}/b configure_file(${MY_SOURCE_DIR}/startMQBuildRawEvents.sh.in ${TMPDIR}/bin/MQ/topologies/install/startMQBuildRawEvents.sh) configure_file(${MY_SOURCE_DIR}/startBuildRawEvents2021.sh.in ${TMPDIR}/bin/MQ/topologies/install/startBuildRawEvents2021.sh) configure_file(${MY_SOURCE_DIR}/startBuildRawEventsCosmics2021.sh.in ${TMPDIR}/bin/MQ/topologies/install/startBuildRawEventsCosmics2021.sh) +configure_file(${MY_SOURCE_DIR}/startBmonMoni2022.sh.in ${TMPDIR}/bin/MQ/topologies/install/startBmonMoni2022.sh) install(PROGRAMS ${TMPDIR}/bin/MQ/topologies/install/startMQMcbmPulserMonitor2020.sh ${TMPDIR}/bin/MQ/topologies/install/startMQMcbmEvtBuilderWin2020.sh diff --git a/MQ/mcbm/CbmDeviceBmonMonitor.cxx b/MQ/mcbm/CbmDeviceBmonMonitor.cxx new file mode 100644 index 0000000000000000000000000000000000000000..38dcf00145c270682a66facfa0caa4301ae763e1 --- /dev/null +++ b/MQ/mcbm/CbmDeviceBmonMonitor.cxx @@ -0,0 +1,605 @@ +/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +/** + * CbmDeviceBmonMonitor.cxx + * + * @since 2022-05-23 + * @author P.-A. Loizeau + */ + +#include "CbmDeviceBmonMonitor.h" + +#include "CbmBmonUnpackConfig.h" +#include "CbmFlesCanvasTools.h" +#include "CbmMQDefs.h" +#include "CbmMuchUnpackConfig.h" +#include "CbmPsdUnpackConfig.h" +#include "CbmRichUnpackConfig.h" +#include "CbmSetup.h" +#include "CbmStsUnpackConfig.h" +#include "CbmTofUnpackConfig.h" +#include "CbmTofUnpackMonitor.h" +#include "CbmTrdUnpackConfig.h" +#include "CbmTrdUnpackFaspConfig.h" + +#include "StorableTimeslice.hpp" +#include "TimesliceMetaData.h" + +#include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig +#include "FairParGenericSet.h" + +#include "TCanvas.h" +#include "TFile.h" +#include "TH1.h" +#include "TList.h" +#include "TNamed.h" + +#include "BoostSerializer.h" +#include <boost/archive/binary_iarchive.hpp> +#include <boost/serialization/utility.hpp> + +#include <array> +#include <iomanip> +#include <stdexcept> +#include <string> +#include <utility> + +#include "RootSerializer.h" +struct InitTaskError : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +using namespace std; + +//Bool_t bMcbm2018MonitorTaskT0ResetHistos = kFALSE; + +CbmDeviceBmonMonitor::CbmDeviceBmonMonitor() {} + +void CbmDeviceBmonMonitor::InitTask() +try { + /// Read options from executable + LOG(info) << "Init options for CbmDeviceBmonMonitor."; + fsSetupName = fConfig->GetValue<std::string>("Setup"); + fuRunId = fConfig->GetValue<uint32_t>("RunId"); + fbUnpBmon = fConfig->GetValue<bool>("UnpBmon"); + fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs"); + fbOutputFullTimeSorting = fConfig->GetValue<bool>("FullTimeSort"); + fvsSetTimeOffs = fConfig->GetValue<std::vector<std::string>>("SetTimeOffs"); + fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); + fsChannelNameDataOutput = fConfig->GetValue<std::string>("TsNameOut"); + fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); + fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); + fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); +} +catch (InitTaskError& e) { + LOG(error) << e.what(); + // Wrapper defined in CbmMQDefs.h to support different FairMQ versions + cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); +} + +Bool_t CbmDeviceBmonMonitor::InitContainers() +{ + LOG(info) << "Init parameter containers for CbmDeviceBmonMonitor."; + + // ----- FIXME: Environment settings? or binary option? + TString srcDir = std::getenv("VMCWORKDIR"); // top source directory, standard C++ library + // TString srcDir = gSystem->Getenv("VMCWORKDIR"); // top source directory + + // ----- CbmSetup ----------------------------------------------------- + // TODO: support for multiple setups on Par Server? with request containing setup name? + CbmSetup* cbmsetup = CbmSetup::Instance(); + FairMQMessagePtr req(NewSimpleMessage("setup")); + FairMQMessagePtr rep(NewMessage()); + + if (Send(req, "parameters") > 0) { + if (Receive(rep, "parameters") >= 0) { + if (0 != rep->GetSize()) { + CbmSetupStorable* exchangableSetup; + + CbmMqTMessage tmsg(rep->GetData(), rep->GetSize()); + exchangableSetup = dynamic_cast<CbmSetupStorable*>(tmsg.ReadObject(tmsg.GetClass())); + + if (nullptr != exchangableSetup) { + /// Prevent clang format single line if + cbmsetup->LoadStoredSetup(exchangableSetup); + } + else { + LOG(error) << "Received corrupt reply. Setup not available"; + throw InitTaskError("Setup not received from par-server."); + } + } // if( 0 != rep->GetSize() ) + else { + LOG(error) << "Received empty reply. Setup not available"; + throw InitTaskError("Setup not received from par-server."); + } // else of if( 0 != rep->GetSize() ) + } // if( Receive( rep, "parameters" ) >= 0) + } // if( Send(req, "parameters") > 0 ) + // ------------------------------------------------------------------------ + + /// Initialize the UnpackerConfigs objects and their "user options" + // ---- BMON ---- + std::shared_ptr<CbmBmonUnpackConfig> bmonconfig = nullptr; + if (fbUnpBmon) { + bmonconfig = std::make_shared<CbmBmonUnpackConfig>("", fuRunId); + if (bmonconfig) { + // bmonconfig->SetDebugState(); + bmonconfig->SetDoWriteOutput(); + // bmonconfig->SetDoWriteOptOutA("CbmBmonErrors"); + std::string parfilesbasepathBmon = Form("%s/macro/beamtime/mcbm2022/", srcDir.Data()); + bmonconfig->SetParFilesBasePath(parfilesbasepathBmon); + bmonconfig->SetParFileName("mBmonCriPar.par"); + bmonconfig->SetSystemTimeOffset(-1220); // [ns] value to be updated + + /// Enable Monitor plots + auto monitor = std::make_shared<CbmTofUnpackMonitor>(); + monitor->SetBmonMode(true); + monitor->SetInternalHttpMode(false); + if (2337 <= fuRunId) { + monitor->SetSpillThreshold(250); + monitor->SetSpillThresholdNonPulser(100); + } + bmonconfig->SetMonitor(monitor); + } + } + // ------------- + + /// Enable full time sorting instead of time sorting per FLIM link + if (bmonconfig) SetUnpackConfig(bmonconfig); + + /// Load time offsets + for (std::vector<std::string>::iterator itStrOffs = fvsSetTimeOffs.begin(); itStrOffs != fvsSetTimeOffs.end(); + ++itStrOffs) { + size_t charPosDel = (*itStrOffs).find(','); + if (std::string::npos == charPosDel) { + LOG(info) << "CbmDeviceBmonMonitor::InitContainers => " + << "Trying to set trigger window with invalid option pattern, ignored! " + << " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found " << (*itStrOffs) << " )"; + } // if( std::string::npos == charPosDel ) + + /// Detector Enum Tag + std::string sSelDet = (*itStrOffs).substr(0, charPosDel); + /// Min number + charPosDel++; + int32_t iOffset = std::stoi((*itStrOffs).substr(charPosDel)); + + if ("kT0" == sSelDet && fBmonConfig) { // + fBmonConfig->SetSystemTimeOffset(iOffset); + } // else if( "kT0" == sSelDet ) + else { + LOG(info) << "CbmDeviceBmonMonitor::InitContainers => Trying to set time " + "offset for unsupported detector, ignored! " + << (sSelDet); + continue; + } // else of detector enum detection + } // for( std::vector< std::string >::iterator itStrAdd = fvsAddDet.begin(); itStrAdd != fvsAddDet.end(); ++itStrAdd ) + + Bool_t initOK = kTRUE; + // --- Bmon + if (fBmonConfig) { + fBmonConfig->InitOutput(); + // RegisterOutputs(ioman, fBmonConfig); /// Framework bound work = kept in this Task + fBmonConfig->SetAlgo(); + fBmonConfig->LoadParFileName(); /// Needed to change the Parameter file name before it is used!!! + initOK &= InitParameters(fBmonConfig->GetParContainerRequest()); /// Framework bound work = kept in this Device + fBmonConfig->InitAlgo(); + // initPerformanceMaps(fkFlesBmon, "Bmon"); + } + + /// Event header object + fCbmTsEventHeader = new CbmTsEventHeader(); + + return initOK; +} + +Bool_t +CbmDeviceBmonMonitor::InitParameters(std::vector<std::pair<std::string, std::shared_ptr<FairParGenericSet>>>* reqparvec) +{ + LOG(info) << "CbmDeviceBmonMonitor::InitParameters"; + if (!reqparvec) { + LOG(info) << "CbmDeviceBmonMonitor::InitParameters - empty requirements vector no parameters initialized."; + return kTRUE; + } + + // Now get the actual ascii files and init the containers with the asciiIo + for (auto& pair : *reqparvec) { + /* + auto filepath = pair.first; + auto parset = pair.second; + FairParAsciiFileIo asciiInput; + if (!filepath.empty()) { + if (asciiInput.open(filepath.data())) { parset->init(&asciiInput); } + } + * */ + std::string paramName {pair.second->GetName()}; + // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). + // Should only be used for small data because of the cost of an additional copy + + // Here must come the proper Runid + std::string message = paramName + ",111"; + LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message; + + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + FairParGenericSet* newObj = nullptr; + + if (Send(req, "parameters") > 0) { + if (Receive(rep, "parameters") >= 0) { + if (0 != rep->GetSize()) { + CbmMqTMessage tmsg(rep->GetData(), rep->GetSize()); + newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass())); + LOG(info) << "Received unpack parameter from the server: " << newObj->GetName(); + newObj->print(); + } // if( 0 != rep->GetSize() ) + else { + LOG(error) << "Received empty reply. Parameter not available"; + return kFALSE; + } // else of if( 0 != rep->GetSize() ) + } // if( Receive( rep, "parameters" ) >= 0) + } // if( Send(req, "parameters") > 0 ) + pair.second.reset(newObj); /// Potentially unsafe reasignment of raw pointer to the shared pointer? + //delete newObj; + } + return kTRUE; +} + +bool CbmDeviceBmonMonitor::InitHistograms() +{ + /// Histos creation and obtain pointer on them + /// Trigger histo creation on all associated algos + // ALGO: bool initOK = fMonitorAlgo->CreateHistograms(); + bool initOK = true; + + /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) + // ALGO: std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector(); + std::vector<std::pair<TNamed*, std::string>> vHistos = fBmonConfig->GetMonitor()->GetHistoVector(); + /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) + // ALGO: std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector(); + std::vector<std::pair<TCanvas*, std::string>> vCanvases = fBmonConfig->GetMonitor()->GetCanvasVector(); + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { + // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() + // << " in " << vHistos[ uHisto ].second.data() + // ; + fArrayHisto.Add(vHistos[uHisto].first); + std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); + fvpsHistosFolder.push_back(psHistoConfig); + + LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { + // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() + // << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[uCanv].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); + + std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); + + fvpsCanvasConfig.push_back(psCanvConfig); + + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + + return initOK; +} + +// Method called by run loop and requesting new data from the TS source whenever +bool CbmDeviceBmonMonitor::ConditionalRun() +{ + /// First do Algo related Initialization steps if needed + if (0 == fulNumMessages) { + try { + InitContainers(); + } + catch (InitTaskError& e) { + LOG(error) << e.what(); + ChangeState(fair::mq::Transition::ErrorFound); + } + } // if( 0 == fulNumMessages) + + if (0 == fulNumMessages) InitHistograms(); + + /// If first TS of this device, ask for the start time (lead to skip of 1 TS for 1st request) + if (!fbStartTimeSet) { + /// Request the start time + std::string message = "SendFirstTimesliceIndex"; + LOG(debug) << "Requesting start time by sending message: SendFirstTimesliceIndex" << message; + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + if (Send(req, fsChannelNameDataInput) <= 0) { + LOG(error) << "Failed to send the request! message was " << message; + return false; + } // if (Send(req, fsChannelNameDataInput) <= 0) + else if (Receive(rep, fsChannelNameDataInput) < 0) { + LOG(error) << "Failed to receive a reply to the request! message was " << message; + return false; + } // else if (Receive(rep, fsChannelNameDataInput) < 0) + else if (rep->GetSize() == 0) { + LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message; + return false; + } // else if (rep->GetSize() == 0) + std::string sReply; + std::string msgStrRep(static_cast<char*>(rep->GetData()), rep->GetSize()); + std::istringstream issRep(msgStrRep); + boost::archive::binary_iarchive inputArchiveRep(issRep); + inputArchiveRep >> sReply; + + fBmonConfig->GetMonitor()->SetHistosStartTime((1e-9) * static_cast<double>(std::stoul(sReply))); + fbStartTimeSet = true; + } + + /// First request a new TS (full one) + std::string message = "full"; + LOG(debug) << "Requesting new TS by sending message: full" << message; + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + if (Send(req, fsChannelNameDataInput) <= 0) { + LOG(error) << "Failed to send the request! message was " << message; + return false; + } // if (Send(req, fsChannelNameDataInput) <= 0) + else if (Receive(rep, fsChannelNameDataInput) < 0) { + LOG(error) << "Failed to receive a reply to the request! message was " << message; + return false; + } // else if (Receive(rep, fsChannelNameDataInput) < 0) + else if (rep->GetSize() == 0) { + LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message; + return false; + } // else if (rep->GetSize() == 0) + + fulNumMessages++; + LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize(); + + if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages"; + + std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize()); + std::istringstream iss(msgStr); + boost::archive::binary_iarchive inputArchive(iss); + + /// Create an empty TS and fill it with the incoming message + fles::StorableTimeslice ts {0}; + inputArchive >> ts; + + /// On first TS, extract the TS parameters from header (by definition stable over time) + if (-1.0 == fdTsCoreSizeInNs) { + fuNbCoreMsPerTs = ts.num_core_microslices(); + fuNbOverMsPerTs = ts.num_microslices(0) - ts.num_core_microslices(); + fdMsSizeInNs = (ts.descriptor(0, fuNbCoreMsPerTs).idx - ts.descriptor(0, 0).idx) / fuNbCoreMsPerTs; + fdTsCoreSizeInNs = fdMsSizeInNs * (fuNbCoreMsPerTs); + fdTsOverSizeInNs = fdMsSizeInNs * (fuNbOverMsPerTs); + fdTsFullSizeInNs = fdTsCoreSizeInNs + fdTsOverSizeInNs; + LOG(info) << "Timeslice parameters: each TS has " << fuNbCoreMsPerTs << " Core MS and " << fuNbOverMsPerTs + << " Overlap MS, for a MS duration of " << fdMsSizeInNs << " ns, a core duration of " << fdTsCoreSizeInNs + << " ns and a full duration of " << fdTsFullSizeInNs << " ns"; + fTsMetaData = new TimesliceMetaData(ts.descriptor(0, 0).idx, fdTsCoreSizeInNs, fdTsOverSizeInNs, ts.index()); + } // if( -1.0 == fdTsCoreSizeInNs ) + else { + /// Update only the fields changing from TS to TS + fTsMetaData->SetStartTime(ts.descriptor(0, 0).idx); + fTsMetaData->SetIndex(ts.index()); + } + + /// Process the Timeslice + DoUnpack(ts, 0); + + // Reset the event header for a new timeslice + fCbmTsEventHeader->Reset(); + + // Reset the unpackers for a new timeslice, e.g. clear the output vectors + // ---- Bmon ---- + if (fBmonConfig) fBmonConfig->Reset(); + + /// Send histograms each 100 time slices. Should be each ~1s + /// Use also runtime checker to trigger sending after M s if + /// processing too slow or delay sending if processing too fast + std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); + std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; + if ((fdMaxPublishTime < elapsedSeconds.count()) + || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { + if (!fbConfigSent) { + // Send the configuration only once per run! + fbConfigSent = SendHistoConfAndData(); + } // if( !fbConfigSent ) + else + SendHistograms(); + + fLastPublishTime = std::chrono::system_clock::now(); + } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) + + return true; +} + +bool CbmDeviceBmonMonitor::SendUnpData() +{ + FairMQParts parts; + + /// Prepare serialized versions of the TS Event header + FairMQMessagePtr messTsHeader(NewMessage()); + // Serialize<RootSerializer>(*messTsHeader, fCbmTsEventHeader); + RootSerializer().Serialize(*messTsHeader, fCbmTsEventHeader); + + parts.AddPart(std::move(messTsHeader)); + + // ---- T0 ---- + std::stringstream ossBmon; + boost::archive::binary_oarchive oaBmon(ossBmon); + if (fBmonConfig) { // + oaBmon << *(fBmonConfig->GetOutputVec()); + } + else { + oaBmon << (std::vector<CbmTofDigi>()); + } + std::string* strMsgBmon = new std::string(ossBmon.str()); + + parts.AddPart(NewMessage( + const_cast<char*>(strMsgBmon->c_str()), // data + strMsgBmon->length(), // size + [](void*, void* object) { delete static_cast<std::string*>(object); }, + strMsgBmon)); // object that manages the data + + /// Prepare serialized versions of the TS Meta + /// FIXME: only for TS duration and overlap, should be sent to parameter service instead as stable values in run + /// Index and start time are already included in the TsHeader object! + FairMQMessagePtr messTsMeta(NewMessage()); + // Serialize<RootSerializer>(*messTsMeta, fTsMetaData); + RootSerializer().Serialize(*messTsMeta, fTsMetaData); + parts.AddPart(std::move(messTsMeta)); + + if (Send(parts, fsChannelNameDataOutput) < 0) { + LOG(error) << "Problem sending data to " << fsChannelNameDataOutput; + return false; + } + + return true; +} + + +bool CbmDeviceBmonMonitor::SendHistoConfAndData() +{ + /// Prepare multiparts message and header + std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size()); + FairMQMessagePtr messageHeader(NewMessage()); + // Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader); + BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader); + FairMQParts partsOut; + partsOut.AddPart(std::move(messageHeader)); + + for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) { + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist(NewMessage()); + // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]); + BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]); + + partsOut.AddPart(std::move(messageHist)); + } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) + + /// Catch case where no histos are registered! + /// => Add empty message + if (0 == fvpsHistosFolder.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + + for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan(NewMessage()); + // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]); + BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]); + + partsOut.AddPart(std::move(messageCan)); + } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) + + /// Catch case where no Canvases are registered! + /// => Add empty message + if (0 == fvpsCanvasConfig.size()) { + FairMQMessagePtr messageHist(NewMessage()); + partsOut.AddPart(std::move(messageHist)); + } + + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr msgHistos(NewMessage()); + // Serialize<RootSerializer>(*msgHistos, &fArrayHisto); + RootSerializer().Serialize(*msgHistos, &fArrayHisto); + partsOut.AddPart(std::move(msgHistos)); + + /// Send the multi-parts message to the common histogram messages queue + if (Send(partsOut, fsChannelNameHistosInput) < 0) { + LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data"; + return false; + } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + // ALGO: fMonitorAlgo->ResetHistograms(kFALSE); + fBmonConfig->GetMonitor()->ResetHistograms(); + fBmonConfig->GetMonitor()->ResetBmonHistograms(kFALSE); + + return true; +} + +bool CbmDeviceBmonMonitor::SendHistograms() +{ + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr message(NewMessage()); + // Serialize<RootSerializer>(*message, &fArrayHisto); + RootSerializer().Serialize(*message, &fArrayHisto); + + /// Send message to the common histogram messages queue + if (Send(message, fsChannelNameHistosInput) < 0) { + LOG(error) << "Problem sending data"; + return false; + } // if( Send( message, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + // ALGO: fMonitorAlgo->ResetHistograms(kFALSE); + + return true; +} + + +CbmDeviceBmonMonitor::~CbmDeviceBmonMonitor() +{ + if (fBmonConfig) fBmonConfig->GetUnpacker()->Finish(); +} + +Bool_t CbmDeviceBmonMonitor::DoUnpack(const fles::Timeslice& ts, size_t /*component*/) +{ + fulTsCounter++; + // Prepare timeslice + // const fles::Timeslice& timeslice = *ts; + + fCbmTsEventHeader->SetTsIndex(ts.index()); + fCbmTsEventHeader->SetTsStartTime(ts.start_time()); + + uint64_t nComponents = ts.num_components(); + // if (fDoDebugPrints) LOG(info) << "Unpack: TS index " << ts.index() << " components " << nComponents; + LOG(debug) << "Unpack: TS index " << ts.index() << " components " << nComponents; + + for (uint64_t component = 0; component < nComponents; component++) { + auto systemId = static_cast<std::uint16_t>(ts.descriptor(component, 0).sys_id); + + switch (systemId) { + case fkFlesBmon: { + if (fBmonConfig) { + fCbmTsEventHeader->AddNDigisBmon( + unpack(systemId, &ts, component, fBmonConfig, fBmonConfig->GetOptOutAVec(), fBmonConfig->GetOptOutBVec())); + } + break; + } + default: { + if (fDoDebugPrints) LOG(error) << "Unpack: Unknown system ID " << systemId << " for component " << component; + break; + } + } + } + + if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices"; + + return kTRUE; +} +/** + * @brief Get the Trd Spadic + * @return std::shared_ptr<CbmTrdSpadic> +*/ +std::shared_ptr<CbmTrdSpadic> CbmDeviceBmonMonitor::GetTrdSpadic(bool useAvgBaseline) +{ + auto spadic = std::make_shared<CbmTrdSpadic>(); + spadic->SetUseBaselineAverage(useAvgBaseline); + spadic->SetMaxAdcToEnergyCal(1.0); + + return spadic; +} + +void CbmDeviceBmonMonitor::Finish() {} diff --git a/MQ/mcbm/CbmDeviceBmonMonitor.h b/MQ/mcbm/CbmDeviceBmonMonitor.h new file mode 100644 index 0000000000000000000000000000000000000000..fc607dc8cb6b983c202646d58db2adeb48ed4f2a --- /dev/null +++ b/MQ/mcbm/CbmDeviceBmonMonitor.h @@ -0,0 +1,256 @@ +/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +/** + * CbmDeviceBmonMonitor.h + * + * @since 2022-05-23 + * @author P.-A. Loizeau + */ + +#ifndef CBMDEVICEBMONMONI_H_ +#define CBMDEVICEBMONMONI_H_ + +#include "CbmMqTMessage.h" +#include "CbmTsEventHeader.h" + +#include "Timeslice.hpp" + +#include "FairMQDevice.h" +#include "FairParGenericSet.h" + +#include "Rtypes.h" +#include "TObjArray.h" + +#include <chrono> +#include <map> +#include <vector> + +class TList; +class CbmBmonUnpackConfig; + +class TimesliceMetaData; + +class CbmTrdSpadic; + +class CbmDeviceBmonMonitor : public FairMQDevice { +public: + CbmDeviceBmonMonitor(); + virtual ~CbmDeviceBmonMonitor(); + +protected: + virtual void InitTask(); + bool ConditionalRun(); + bool HandleCommand(FairMQMessagePtr&, int); + + /** @brief Set the Bmon Unpack Config @param config */ + void SetUnpackConfig(std::shared_ptr<CbmBmonUnpackConfig> config) { fBmonConfig = config; } + +private: + /// Constants + static constexpr std::uint16_t fkFlesBmon = static_cast<std::uint16_t>(fles::SubsystemIdentifier::T0); + + + /// Control flags + Bool_t fbIgnoreOverlapMs = false; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice + Bool_t fbComponentsAddedToList = kFALSE; + bool fbStartTimeSet = false; + + /** @brief Flag if extended debug output is to be printed or not*/ + bool fDoDebugPrints = false; //! + /** @brief Flag if performance profiling should be activated or not.*/ + bool fDoPerfProf = false; //! + /** @brief Flag to Enable/disable a full time sorting. If off, time sorting happens per link/FLIM source */ + bool fbOutputFullTimeSorting = false; + + /// User settings parameters + std::string fsSetupName = "mcbm_beam_2021_07_surveyed"; + uint32_t fuRunId = 1588; + /// ---> for selective unpacking + bool fbUnpBmon = true; + /// message queues + std::string fsChannelNameDataInput = "ts-request"; + std::string fsChannelNameDataOutput = "unpts_0"; + std::string fsChannelNameCommands = "commands"; + std::string fsChannelNameHistosInput = "histogram-in"; + /// Histograms management + uint32_t fuPublishFreqTs = 100; + double_t fdMinPublishTime = 0.5; + double_t fdMaxPublishTime = 5.0; + + /// Parameters management + // TList* fParCList = nullptr; + Bool_t InitParameters(std::vector<std::pair<std::string, std::shared_ptr<FairParGenericSet>>>* reqparvec); + + /// Statistics & first TS rejection + uint64_t fulNumMessages = 0; + uint64_t fulTsCounter = 0; + std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); + /** @brief Map to store a name for the unpackers and the processed amount of digis, key = fkFlesId*/ + std::map<std::uint16_t, std::pair<std::string, size_t>> fNameMap = {}; //! + /** @brief Map to store the cpu and wall time, key = fkFlesId*/ + std::map<std::uint16_t, std::pair<double, double>> fTimeMap = {}; //! + /** @brief Map to store the in and out data amount, key = fkFlesId*/ + std::map<std::uint16_t, std::pair<double, double>> fDataSizeMap = {}; //! + + /// Configuration of the unpackers. Provides the configured algorithm + std::shared_ptr<CbmBmonUnpackConfig> fBmonConfig = nullptr; + + /// Pointer to the Timeslice header conatining start time and index + CbmTsEventHeader* fCbmTsEventHeader = nullptr; + + /// Time offsets + std::vector<std::string> fvsSetTimeOffs = {}; + + /// TS MetaData storage: stable so should be moved somehow to parameters handling (not transmitted with each TS + size_t fuNbCoreMsPerTs = 0; //! + size_t fuNbOverMsPerTs = 0; //! + Double_t fdMsSizeInNs = 0; //! Size of a single MS, [nanoseconds] + Double_t fdTsCoreSizeInNs = -1.0; //! Total size of the core MS in a TS, [nanoseconds] + Double_t fdTsOverSizeInNs = -1.0; //! Total size of the overlap MS in a TS, [nanoseconds] + Double_t fdTsFullSizeInNs = -1.0; //! Total size of all MS in a TS, [nanoseconds] + TimesliceMetaData* fTsMetaData; + + /// Array of histograms to send to the histogram server + TObjArray fArrayHisto = {}; + /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server + std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {}; + /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server + /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" + /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" + std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; + /// Flag indicating whether the histograms and canvases configurations were already published + bool fbConfigSent = false; + + Bool_t InitContainers(); + bool InitHistograms(); + Bool_t DoUnpack(const fles::Timeslice& ts, size_t component); + void Finish(); + bool SendUnpData(); + bool SendHistoConfAndData(); + bool SendHistograms(); + + std::shared_ptr<CbmTrdSpadic> GetTrdSpadic(bool useAvgBaseline); + + /** @brief Sort a vector timewise vector type has to provide GetTime() */ + template<typename TVecobj> + typename std::enable_if<std::is_same<TVecobj, std::nullptr_t>::value == true, void>::type + timesort(std::vector<TVecobj>* /*vec = nullptr*/) + { + LOG(debug) + << "CbmDeviceBmonMonitor::timesort() got an object that has no member function GetTime(). Hence, we can and " + "will not timesort it!"; + } + + template<typename TVecobj> + typename std::enable_if<!std::is_member_function_pointer<decltype(&TVecobj::GetTime)>::value, void>::type + timesort(std::vector<TVecobj>* /*vec = nullptr*/) + { + LOG(debug) << "CbmDeviceBmonMonitor::timesort() " << TVecobj::Class_Name() + << "is an object that has no member function GetTime(). Hence, we can and " + "will not timesort it!"; + } + + template<typename TVecobj> + typename std::enable_if<std::is_member_function_pointer<decltype(&TVecobj::GetTime)>::value, void>::type + timesort(std::vector<TVecobj>* vec = nullptr) + { + if (vec == nullptr) return; + std::sort(vec->begin(), vec->end(), + [](const TVecobj& a, const TVecobj& b) -> bool { return a.GetTime() < b.GetTime(); }); + } + + /** + * @brief Template for the unpacking call of a given algorithm. + * + * @tparam TAlgo Algorithm to be called + * @tparam TOutput Output element types + * @tparam TOptoutputs Optional output element types + * @param ts Timeslice + * @param icomp Component number + * @param algo Algorithm to be used for this component + * @param outtargetvec Target vector for the output elements + * @param optoutputvecs Target vectors for optional outputs + * @return std::pair<ndigis, std::pair<cputime, walltime>> + */ + template<class TConfig, class TOptOutA = std::nullptr_t, class TOptOutB = std::nullptr_t> + size_t unpack(const std::uint16_t subsysid, const fles::Timeslice* ts, std::uint16_t icomp, TConfig config, + std::vector<TOptOutA>* optouttargetvecA = nullptr, std::vector<TOptOutB>* optouttargetvecB = nullptr) + { + + auto wallstarttime = std::chrono::high_resolution_clock::now(); + std::clock_t cpustarttime = std::clock(); + + auto algo = config->GetUnpacker(); + std::vector<TOptOutA> optoutAvec = {}; + std::vector<TOptOutB> optoutBvec = {}; + if (optouttargetvecA) { algo->SetOptOutAVec(&optoutAvec); } + if (optouttargetvecB) { algo->SetOptOutBVec(&optoutBvec); } + + // Set the start time of the current TS for this algorithm + algo->SetTsStartTime(ts->start_time()); + + // Run the actual unpacking + auto digivec = algo->Unpack(ts, icomp); + + // Check if we want to write the output to somewhere (in pure online monitoring mode for example this can/would/should be skipped) + if (config->GetOutputVec()) { + // Lets do some time-sorting if we are not doing it later + if (!fbOutputFullTimeSorting) timesort(&digivec); + + // Transfer the data from the timeslice vector to the target branch vector + // Digis/default output retrieved as offered by the algorithm + for (auto digi : digivec) + config->GetOutputVec()->emplace_back(digi); + } + if (optouttargetvecA) { + // Lets do some timesorting + if (!fbOutputFullTimeSorting) timesort(&optoutAvec); + // Transfer the data from the timeslice vector to the target branch vector + for (auto optoutA : optoutAvec) + optouttargetvecA->emplace_back(optoutA); + } + if (optouttargetvecB) { + // Second opt output is not time sorted to allow non GetTime data container. + // Lets do some timesorting + timesort(&optoutAvec); + // Transfer the data from the timeslice vector to the target branch vector + for (auto optoutB : optoutBvec) + optouttargetvecB->emplace_back(optoutB); + } + + std::clock_t cpuendtime = std::clock(); + auto wallendtime = std::chrono::high_resolution_clock::now(); + + // Cpu time in [µs] + auto cputime = 1e6 * (cpuendtime - cpustarttime) / CLOCKS_PER_SEC; + algo->AddCpuTime(cputime); + // Real time in [µs] + auto walltime = std::chrono::duration<double, std::micro>(wallendtime - wallstarttime).count(); + algo->AddWallTime(walltime); + + + // Check some numbers from this timeslice + size_t nDigis = digivec.size(); + LOG(debug) << "Component " << icomp << " connected to config " << config->GetName() << " n-Digis " << nDigis + << " processed in walltime(cputime) = " << walltime << "(" << cputime << cputime << ") µs" + << "this timeslice."; + + if (fDoPerfProf) { + auto timeit = fTimeMap.find(subsysid); + timeit->second.first += cputime; + timeit->second.second += walltime; + + auto datait = fDataSizeMap.find(subsysid); + datait->second.first += ts->size_component(icomp) / 1.0e6; + datait->second.second += nDigis * algo->GetOutputObjSize() / 1.0e6; + + fNameMap.find(subsysid)->second.second += nDigis; + } + + return nDigis; + } +}; + +#endif /* CBMDEVICEMCBMUNPACK_H_ */ diff --git a/MQ/mcbm/runBmonMonitor.cxx b/MQ/mcbm/runBmonMonitor.cxx new file mode 100644 index 0000000000000000000000000000000000000000..43177e5238d933b04558b062238c2fadf3531f9c --- /dev/null +++ b/MQ/mcbm/runBmonMonitor.cxx @@ -0,0 +1,41 @@ +/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +#include "CbmDeviceBmonMonitor.h" + +#include <iomanip> +#include <string> + +#include "runFairMQDevice.h" + +namespace bpo = boost::program_options; +using namespace std; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options()("Setup", bpo::value<std::string>()->default_value("mcbm_beam_2021_07_surveyed"), + "Name/tag of the geomatry setup"); + options.add_options()("RunId", bpo::value<uint32_t>()->default_value(1588), "Run ID"); + options.add_options()("UnpBmon", bpo::value<bool>()->default_value(false), "Enable Bmon unpacking if true"); + options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true"); + options.add_options()("FullTimeSort", bpo::value<bool>()->default_value(true), + "Full time sorting per detector before sending output array"); + options.add_options()("SetTimeOffs", bpo::value<std::vector<std::string>>()->multitoken()->composing(), + "Set time offset in ns for selected detector, use string matching " + "ECbmModuleId,dOffs e.g. kTof,-35.2"); + options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"), + "MQ channel name for raw TS data"); + options.add_options()("TsNameOut", bpo::value<std::string>()->default_value("unpts_0"), + "MQ channel name for unpacked TS data"); + + options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(0), "Histo publishing frequency in TS"); + options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), + "Minimal time between two publishing"); + options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), + "Maximal time between two publishing"); + options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), + "MQ channel name for histos"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceBmonMonitor(); } diff --git a/MQ/mcbm/startBmonMoni2022.sh.in b/MQ/mcbm/startBmonMoni2022.sh.in new file mode 100755 index 0000000000000000000000000000000000000000..10c9f25dbd67f07babf2820e0271b97cd44fa48b --- /dev/null +++ b/MQ/mcbm/startBmonMoni2022.sh.in @@ -0,0 +1,235 @@ +#!/bin/bash + +if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then + @SIMPATH@/bin/fairmq-shmmonitor --cleanup +fi + +if [ $# -ge 2 ]; then + _nbbranch=$1 + _run_id=$2 + ((_pubfreqts = $_nbbranch*100 )) + _pubminsec=1.0 + _pubmaxsec=10.0 + + if [ $# -ge 5 ]; then + _filename="" + _dirname="" + _hostname=$5 + + if [ $# -ge 6 ]; then + _pubfreqts=$6 + + if [ $# -ge 7 ]; then + _pubminsec=$7 + + if [ $# -ge 8 ]; then + _pubmaxsec=$8 + fi + fi + fi + elif [ $# -ge 3 ]; then + _filename=$3 + _hostname="" + if [ $# -eq 4 ]; then + _dirname=$4 + else + _dirname="" + fi + else + echo 'Missing parameters or wrong number of parameters.' + echo 'Possible usages are:' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> <full filename pattern list>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> <filename pattern> <folder_path>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>' + + return -1 + fi +else + echo 'Missing parameters. At least the number of branches and the trigger set are required' + echo 'Possible usages are:' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> <full filename pattern list>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> <filename pattern> <folder_path>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>' + echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>' + + return -1 +fi + +_parfileSts=@VMCWORKDIR@/macro/beamtime/mcbm2022/mStsPar.par +_parfileMuch=@VMCWORKDIR@/macro/beamtime/mcbm2022/mMuchPar.par # Valid from 2163 +_parfileTrdAsic=@VMCWORKDIR@/parameters/trd/trd_v22d_mcbm.asic.par +_parfileTrdDigi=@VMCWORKDIR@/parameters/trd/trd_v22d_mcbm.digi.par +_parfileTrdGas=@VMCWORKDIR@/parameters/trd/trd_v22d_mcbm.gas.par +_parfileTrdGain=@VMCWORKDIR@/parameters/trd/trd_v22d_mcbm.gain.par +_parfileTof=@VMCWORKDIR@/macro/beamtime/mcbm2022/mTofCriPar.par +_parfileBmon=@VMCWORKDIR@/macro/beamtime/mcbm2022/mBmonCriPar.par +_parfileRich=@VMCWORKDIR@/macro/beamtime/mcbm2021/mRichPar_70.par +_parfilePsd=@VMCWORKDIR@/macro/beamtime/mcbm2021/mPsdPar.par +_setup_name=mcbm_beam_2022_03_22_iron + +if [ $_run_id -ge 2060 ]; then + if [ $_run_id -le 2065 ]; then + _setup_name=mcbm_beam_2022_03_09_carbon + _parfileMuch=@VMCWORKDIR@/macro/beamtime/mcbm2022/mMuchParUpto26032022.par # Valid for runs 2060-2162 + elif [ $_run_id -le 2160 ]; then # Potentially wrong setup between 2065 and 2150 but not official runs + _setup_name=mcbm_beam_2022_03_22_iron + _parfileMuch=@VMCWORKDIR@/macro/beamtime/mcbm2022/mMuchParUpto26032022.par # Valid for runs 2060-2162 + elif [ $_run_id -le 2310 ]; then # Potentially wrong setup between 2160 and 2176 but not official runs + _setup_name=mcbm_beam_2022_03_28_uranium + fi +fi + + +_ratelog=0 # hides ZMQ messages rates and bandwidth +#_ratelog=1 # display ZMQ messages rates and bandwidth + +LOGFILETAG=`hostname` +LOGFILETAG+="_" +LOGFILETAG+=`date +%Y_%m_%d_%H_%M_%S` +LOGFILETAG+=".log" + +# Compute limits of TOF selection/trigger window +_TofL=$_TofMean +_TofH=$_TofMean +(( _TofL -= _TofWin)) +(( _TofH += _TofWin)) +echo Tof window $_TofL - $_TofH + +LIST_OF_PIDS="" + +(( _paraBuffSz=100 )) +(( _singBuffSz=_paraBuffSz*_nbbranch )) + +echo "Buffer size for parallel devices $_paraBuffSz" +echo "Buffer size for singleton devices $_singBuffSz" + + +SAMPLER="RepReqTsSampler" +SAMPLER+=" --control static" +SAMPLER+=" --id sampler1" +#SAMPLER+=" --max-timeslices 0" +#SAMPLER+=" --max-timeslices 10" +#SAMPLER+=" --max-timeslices 30" +#SAMPLER+=" --max-timeslices 100" +#SAMPLER+=" --max-timeslices 300" +#SAMPLER+=" --max-timeslices 1000" +SAMPLER+=" --max-timeslices -1" +#SAMPLER+=" --severity info" +#SAMPLER+=" --flib-port 10" +if [ "$_hostname" != "" ]; then + SAMPLER+=" --fles-host $_hostname" +elif [ "$_filename" != "" ]; then + SAMPLER+=" --filename $_filename" + if [ "$_dirname" != "" ]; then + SAMPLER+=" --dirname $_dirname" + fi +fi +SAMPLER+=" --high-water-mark 10" +SAMPLER+=" --no-split-ts 1" +SAMPLER+=" --ChNameMissTs missedts" +SAMPLER+=" --ChNameCmds commands" +SAMPLER+=" --PubFreqTs $_pubfreqts" +SAMPLER+=" --PubTimeMin $_pubminsec" +SAMPLER+=" --PubTimeMax $_pubmaxsec" +SAMPLER+=" --channel-config name=ts-request,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11555,rateLogging=$_ratelog" +SAMPLER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666,rateLogging=$_ratelog" +SAMPLER+=" --channel-config name=missedts,type=pub,method=bind,address=tcp://127.0.0.1:11006,rateLogging=$_ratelog" +SAMPLER+=" --channel-config name=commands,type=pub,method=bind,address=tcp://127.0.0.1:11007,rateLogging=$_ratelog" +SAMPLER+=" --transport zeromq" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +SAMPLER_LOG="sampler1_$LOGFILETAG" +# xterm -l -lf $SAMPLER_LOG -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER & +nohup @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER &> $SAMPLER_LOG & + +echo $SAMPLER + +LIST_OF_PIDS+=$! +LIST_OF_PIDS+=" " + +PARAMETERSERVER="parmq-server" +PARAMETERSERVER+=" --control static" +PARAMETERSERVER+=" --id parmq-server" +PARAMETERSERVER+=" --severity info" +PARAMETERSERVER+=" --channel-name parameters" +PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" +PARAMETERSERVER+=" --first-input-name $_parfileSts;$_parfileMuch;$_parfileTrdAsic;$_parfileTrdDigi;$_parfileTrdGas;$_parfileTrdGain;$_parfileTof;$_parfileBmon;$_parfileRich;$_parfilePsd" +PARAMETERSERVER+=" --first-input-type ASCII" +#PARAMETERSERVER+=" --libs-to-load=CbmStsBase;CbmMuchBase;CbmTrdBase;CbmTofBase;CbmRichBase" # doesn't work due to runtime problem +PARAMETERSERVER+=" --setup $_setup_name" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +PARAMSRV_LOG="parmq_$LOGFILETAG" +# xterm -l -lf $PARAMSRV_LOG -geometry 80x23+1600+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER & +nohup @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &> $PARAMSRV_LOG & + +LIST_OF_PIDS+=$! +LIST_OF_PIDS+=" " + +HISTSERVER="MqHistoServer" +HISTSERVER+=" --control static" +HISTSERVER+=" --id server1" +HISTSERVER+=" --severity info" +HISTSERVER+=" --histport 8091" +HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666,rateLogging=$_ratelog" +HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667,rateLogging=0" +HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668,rateLogging=0" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +HISTSRV_LOG="server1_$LOGFILETAG" +# xterm -l -lf $HISTSRV_LOG -geometry 80x23+2000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER & +nohup @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &> $HISTSRV_LOG & + +LIST_OF_PIDS+=$! +LIST_OF_PIDS+=" " + +sleep 1 + +_iBranch=0 +while (( _iBranch < _nbbranch )); do + (( _yOffset=200*_iBranch )) + (( _iBranch += 1 )) + (( _iPort = 11680 + _iBranch )) + + UNPACKER="BmonMonitor" + UNPACKER+=" --control static" + UNPACKER+=" --id unp$_iBranch" + #UNPACKER+=" --severity error" + UNPACKER+=" --severity info" + #UNPACKER+=" --severity debug" + UNPACKER+=" --Setup $_setup_name" + UNPACKER+=" --RunId $_run_id" + UNPACKER+=" --IgnOverMs 1" + UNPACKER+=" --UnpBmon true" + UNPACKER+=" --SetTimeOffs kT0,0" + UNPACKER+=" --PubFreqTs $_pubfreqts" + UNPACKER+=" --PubTimeMin $_pubminsec" + UNPACKER+=" --PubTimeMax $_pubmaxsec" + UNPACKER+=" --TsNameOut unpts$_iBranch" + UNPACKER+=" --channel-config name=ts-request,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11555,rateLogging=$_ratelog" + UNPACKER+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" + UNPACKER+=" --channel-config name=unpts$_iBranch,type=push,method=bind,transport=zeromq,sndBufSize=2,address=tcp://127.0.0.1:$_iPort,rateLogging=$_ratelog" +# UNPACKER+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" + UNPACKER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666,rateLogging=$_ratelog" + UNPACKER+=" --transport zeromq" + # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX + # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log + UNPACKER_LOG="unp$_iBranch" + UNPACKER_LOG+="_$LOGFILETAG" + # xterm -l -lf $UNPACKER_LOG -geometry 132x23+400+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/mcbm/$UNPACKER & + nohup @CMAKE_BINARY_DIR@/bin/MQ/mcbm/$UNPACKER &> $UNPACKER_LOG & + + LIST_OF_PIDS+=$! + LIST_OF_PIDS+=" " +done + +LIST_OF_PIDS+=$! +LIST_OF_PIDS+=" " + +PID_LOG="pids_$LOGFILETAG" +echo $LIST_OF_PIDS &> $PID_LOG diff --git a/MQ/source/CbmMQTsSamplerRepReq.cxx b/MQ/source/CbmMQTsSamplerRepReq.cxx index 61decdbeb490376c8a0b21f49e24152a46f6d6e6..5872da9caadbe9c2fc190883e463d84b91e45129 100644 --- a/MQ/source/CbmMQTsSamplerRepReq.cxx +++ b/MQ/source/CbmMQTsSamplerRepReq.cxx @@ -359,6 +359,18 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) return true; } + /// TODO: add support for alternative request with "system name" instead of "system ID" + std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize()); + if ("SendFirstTimesliceIndex" == reqStr) { + if (0 == fulFirstTsIndex) { // + GetNewTs(); + } + if (!SendFirstTsIndex() && !fbEofFound) { // + return false; + } + return true; + } + if (fbNoSplitTs) { if (!CreateAndSendFullTs() && !fbEofFound) { @@ -374,7 +386,6 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) } // if( fbNoSplitTs ) else if (fbSendTsPerSysId) { /// TODO: add support for alternative request with "system name" instead of "system ID" - std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize()); int iSysId = std::stoi(reqStr); LOG(debug) << "Received TS SysId component request from client: 0x" << std::hex << iSysId << std::dec; @@ -392,7 +403,6 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int) } // if(!CreateAndCombineComponentsPerSysId(iSysId) && !fbEofFound) } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs else if (fbSendTsPerBlock) { - std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize()); LOG(debug) << "Received TS components block request from client: " << reqStr; /// This assumes that the order of the components does NOT change after the first TS @@ -443,9 +453,15 @@ std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs() const fles::Timeslice& ts = *timeslice; uint64_t uTsIndex = ts.index(); + if (0 == fulFirstTsIndex) { // + fulFirstTsIndex = ts.descriptor(0, 0).idx; + } + if (0 < fuPublishFreqTs) { uint64_t uTsTime = ts.descriptor(0, 0).idx; - if (0 == fuStartTime) { fuStartTime = uTsTime; } // if( 0 == fuStartTime ) + if (0 == fuStartTime) { // + fuStartTime = uTsTime; + } // if( 0 == fuStartTime ) fdTimeToStart = static_cast<double_t>(uTsTime - fuStartTime) / 1e9; uint64_t uSizeMb = 0; @@ -819,6 +835,36 @@ bool CbmMQTsSamplerRepReq::CreateCombinedComponentsPerBlock(std::string sBlockNa return false; } +bool CbmMQTsSamplerRepReq::SendFirstTsIndex() +{ + // create the message with the first timeslice index + std::string sIndex = FormatDecPrintout(fulFirstTsIndex); + // serialize the vector and create the message + std::stringstream oss; + boost::archive::binary_oarchive oa(oss); + oa << sIndex; + std::string* strMsg = new std::string(oss.str()); + + FairMQMessagePtr msg(NewMessage( + const_cast<char*>(strMsg->c_str()), // data + strMsg->length(), // size + [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, + strMsg)); // object that manages the data + + // in case of error or transfer interruption, + // return false to go to IDLE state + // successfull transfer will return number of bytes + // transfered (can be 0 if sending an empty message). + if (Send(msg, fsChannelNameTsRequest) < 0) { + LOG(error) << "Problem sending reply with first TS index"; + return false; + } + + fulMessageCounter++; + LOG(debug) << "Send message " << fulMessageCounter << " with a size of " << msg->GetSize(); + + return true; +} bool CbmMQTsSamplerRepReq::SendData(const fles::StorableTimeslice& component) { // serialize the timeslice and create the message diff --git a/MQ/source/CbmMQTsSamplerRepReq.h b/MQ/source/CbmMQTsSamplerRepReq.h index a5d186f741d5a86d22af2fa2fba127826e1d4df3..b12b6a268c8a033bdef6049d122f8a25f3a34b52 100644 --- a/MQ/source/CbmMQTsSamplerRepReq.h +++ b/MQ/source/CbmMQTsSamplerRepReq.h @@ -59,6 +59,7 @@ protected: double_t fdMinPublishTime = 0.5; double_t fdMaxPublishTime = 5; + uint64_t fulFirstTsIndex = 0; uint64_t fulPrevTsIndex = 0; uint64_t fulTsCounter = 0; uint64_t fulMessageCounter = 0; @@ -80,6 +81,7 @@ private: bool PrepareCompListPerBlock(); bool CreateCombinedComponentsPerBlock(std::string sBlockName); + bool SendFirstTsIndex(); bool SendData(const fles::StorableTimeslice& component); bool SendMissedTsIdx(std::vector<uint64_t> vIndices); bool SendCommand(std::string sCommand);