diff --git a/MQ/mcbm/CMakeLists.txt b/MQ/mcbm/CMakeLists.txt index 09a6fbb38e581ac7226ef31fb6fabf2bf15eb622..3440980a148841cac11dc1b7bf12ae558e22e132 100644 --- a/MQ/mcbm/CMakeLists.txt +++ b/MQ/mcbm/CMakeLists.txt @@ -3,10 +3,13 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQMcbmEvtBuilderWin2020.sh.in ${ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQBuildRawEvents.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQBuildRawEvents.sh) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEvents2021.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEvents2021.sh) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEventsCosmics2021.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEventsCosmics2021.sh) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startEventBuilder.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startEventBuilder.sh) set(INCLUDE_DIRECTORIES ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_SOURCE_DIR}/MQ/base + ${CBMROOT_SOURCE_DIR}/algo/evbuild + ${CBMROOT_SOURCE_DIR}/algo/trigger ${CBMROOT_SOURCE_DIR}/fles/mcbm2018/unpacker ${CBMROOT_SOURCE_DIR}/fles/mcbm2018/tasks ${CBMROOT_SOURCE_DIR}/fles/mcbm2018/parameter @@ -257,6 +260,29 @@ set(DEPENDENCIES ) GENERATE_EXECUTABLE() +set(EXE_NAME EventBuilder) +set(SRCS CbmDeviceEventBuilder.cxx runEventBuilder.cxx) + +set(DEPENDENCIES + ${DEPENDENCIES} + ${FAIR_LIBS} + ${BOOST_LIBS} + Algo + CbmFlibFlesTools + CbmEventBuilder + KF + L1 + CbmBase + CbmRecoBase + CbmData + CbmTofBase + Core + RIO + Net + Hist + RHTTP +) +GENERATE_EXECUTABLE() #set(INCLUDE_DIRECTORIES # ${CBMDATA_DIR}/base diff --git a/MQ/mcbm/CbmDeviceEventBuilder.cxx b/MQ/mcbm/CbmDeviceEventBuilder.cxx new file mode 100644 index 0000000000000000000000000000000000000000..fca6fae708d515e9095e1c9b241c486eecf421b0 --- /dev/null +++ b/MQ/mcbm/CbmDeviceEventBuilder.cxx @@ -0,0 +1,367 @@ +/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau[committer] */ + +/** + * CbmDeviceEventBuilder.cxx + * + * @since 2021-11-18 + * @author P.-A. Loizeau + */ + +#include "CbmDeviceEventBuilder.h" + +/// CBM headers +#include "CbmEvent.h" +#include "CbmFlesCanvasTools.h" +#include "CbmMQDefs.h" +#include "CbmMatch.h" +#include "CbmMvdDigi.h" +#include "CbmTsEventHeader.h" + +/// FAIRROOT headers +#include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig +#include "FairParGenericSet.h" +#include "FairRunOnline.h" + +#include "BoostSerializer.h" + +#include "RootSerializer.h" + +/// FAIRSOFT headers (geant, boost, ...) +#include "TCanvas.h" +#include "TFile.h" +#include "TH1.h" +#include "TList.h" +#include "TNamed.h" + +#include <boost/archive/binary_iarchive.hpp> +#include <boost/serialization/utility.hpp> + +/// C/C++ headers +#include <array> +#include <iomanip> +#include <stdexcept> +#include <string> +struct InitTaskError : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +using namespace std; + +CbmDeviceEventBuilder::CbmDeviceEventBuilder() {} + +void CbmDeviceEventBuilder::InitTask() +try { + /// Read options from executable + LOG(info) << "Init options for CbmDeviceEventBuilder."; + //fbFillHistos = fConfig->GetValue<bool>("FillHistos"); + //fbIgnoreTsOverlap = fConfig->GetValue<bool>("IgnOverMs"); + + // Event builder algorithm params + const std::vector<std::string> vsAddDet = fConfig->GetValue<std::vector<std::string>>("AddDet"); + const std::vector<std::string> vsSetEvbuildWin = fConfig->GetValue<std::vector<std::string>>("SetEvbuildWin"); + + // Trigger algorithm params + const std::string sTriggerDet = fConfig->GetValue<std::string>("TriggerDet"); + fTriggerWindow = fConfig->GetValue<double>("TriggerWin"); + fMinNumDigis = fConfig->GetValue<int32_t>("TriggerMinDigis"); + fDeadTime = fConfig->GetValue<double>("TriggerDeadTime"); + + fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); + fsChannelNameDataOutput = fConfig->GetValue<std::string>("EvtNameOut"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); + fsAllowedChannels[0] = fsChannelNameDataInput; + + fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); + fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); + fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + + // Get the information about created channels from the device + // Check if the defined channels from the topology (by name) + // are in the list of channels which are possible/allowed + // for the device + // The idea is to check at initilization if the devices are + // properly connected. For the time beeing this is done with a + // nameing convention. It is not avoided that someone sends other + // data on this channel. + //logger::SetLogLevel("INFO"); + int noChannel = fChannels.size(); + LOG(info) << "Number of defined channels: " << noChannel; + for (auto const& entry : fChannels) { + LOG(info) << "Channel name: " << entry.first; + if (std::string::npos != entry.first.find(fsChannelNameDataInput)) { + if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); + OnData(entry.first, &CbmDeviceEventBuilder::HandleData); + } + } + + /// Extract refdet + fTriggerDet = GetDetectorId(sTriggerDet); + + if (ECbmModuleId::kNotExist == fTriggerDet) { + LOG(info) << "CbmDeviceEventBuilder::InitTask => Trying to change " + "reference to unsupported detector, ignored! " + << sTriggerDet; + } + + /// Extract detector to add if any + for (std::vector<std::string>::const_iterator itStrAdd = vsAddDet.begin(); itStrAdd != vsAddDet.end(); ++itStrAdd) { + const ECbmModuleId addDet = GetDetectorId(*itStrAdd); + if (ECbmModuleId::kNotExist != addDet) { fEvbuildAlgo.AddSystem(addDet); } + else { + LOG(info) << "CbmDeviceEventBuilder::InitTask => Trying to add " + "unsupported detector, ignored! " + << (*itStrAdd); + continue; + } + } + + /// Extract event builder window to add if any + for (std::vector<std::string>::const_iterator itStrEvbuildWin = vsSetEvbuildWin.begin(); + itStrEvbuildWin != vsSetEvbuildWin.end(); ++itStrEvbuildWin) { + size_t charPosDel = (*itStrEvbuildWin).find(','); + if (std::string::npos == charPosDel) { + LOG(info) << "CbmDeviceEventBuilder::InitTask => " + << "Trying to set event builder window with invalid option pattern, ignored! " + << " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found " << (*itStrEvbuildWin) << " )"; + continue; + } + + /// Detector Enum Tag + std::string sSelDet = (*itStrEvbuildWin).substr(0, charPosDel); + const ECbmModuleId selDet = GetDetectorId(sSelDet); + + if (ECbmModuleId::kNotExist == selDet) { + LOG(info) << "CbmDeviceEventBuilder::InitTask => " + << "Trying to set trigger window for unsupported detector, ignored! " << sSelDet; + continue; + } + + /// Window beginning + charPosDel++; + std::string sNext = (*itStrEvbuildWin).substr(charPosDel); + charPosDel = sNext.find(','); + if (std::string::npos == charPosDel) { + LOG(info) << "CbmDeviceEventBuilder::InitTask => " + << "Trying to set event builder window with invalid option pattern, ignored! " + << " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found " << (*itStrEvbuildWin) << " )"; + continue; + } + double dWinBeg = std::stod(sNext.substr(0, charPosDel)); + + /// Window end + charPosDel++; + double dWinEnd = std::stod(sNext.substr(charPosDel)); + + fEvbuildAlgo.SetTriggerWindow(selDet, dWinBeg, dWinEnd); + } + + /// Create input vectors + fCbmTsEventHeader = new CbmTsEventHeader(); +} +catch (InitTaskError& e) { + LOG(error) << e.what(); + // Wrapper defined in CbmMQDefs.h to support different FairMQ versions + cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); +} + +ECbmModuleId CbmDeviceEventBuilder::GetDetectorId(std::string detName) +{ + /// FIXME: Disable clang formatting for now as it corrupts all alignment + /* clang-format off */ + ECbmModuleId detId = ("kT0" == detName ? ECbmModuleId::kT0 + : ("kSts" == detName ? ECbmModuleId::kSts + : ("kMuch" == detName ? ECbmModuleId::kMuch + : ("kTrd" == detName ? ECbmModuleId::kTrd + : ("kTof" == detName ? ECbmModuleId::kTof + : ("kRich" == detName ? ECbmModuleId::kRich + : ("kPsd" == detName ? ECbmModuleId::kPsd + : ECbmModuleId::kNotExist))))))); + return detId; + /// FIXME: Re-enable clang formatting after formatted lines + /* clang-format on */ +} + +bool CbmDeviceEventBuilder::IsChannelNameAllowed(std::string channelName) +{ + for (auto const& entry : fsAllowedChannels) { + std::size_t pos1 = channelName.find(entry); + if (pos1 != std::string::npos) { + const vector<std::string>::const_iterator pos = + std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry); + const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin(); + LOG(info) << "Found " << entry << " in " << channelName; + LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx; + return true; + } + } + LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names."; + LOG(error) << "Stop device."; + return false; +} + + +// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) +bool CbmDeviceEventBuilder::HandleData(FairMQParts& parts, int /*index*/) +{ + fulNumMessages++; + LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts" + << ", size0: " << parts.At(0)->GetSize(); + + if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages"; + + /// Extract unpacked data from input message + uint32_t uPartIdx = 0; + + /// TS header + Deserialize<RootSerializer>(*parts.At(uPartIdx), fCbmTsEventHeader); + ++uPartIdx; + + CbmDigiTimeslice ts; + + /// T0 + std::string msgStrT0(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issT0(msgStrT0); + boost::archive::binary_iarchive inputArchiveT0(issT0); + inputArchiveT0 >> ts.fData.fT0.fDigis; + ++uPartIdx; + + /// STS + std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issSts(msgStrSts); + boost::archive::binary_iarchive inputArchiveSts(issSts); + inputArchiveSts >> ts.fData.fSts.fDigis; + ++uPartIdx; + + /// MUCH + std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issMuch(msgStrMuch); + boost::archive::binary_iarchive inputArchiveMuch(issMuch); + inputArchiveMuch >> ts.fData.fMuch.fDigis; + ++uPartIdx; + + /// TRD + std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issTrd(msgStrTrd); + boost::archive::binary_iarchive inputArchiveTrd(issTrd); + inputArchiveTrd >> ts.fData.fTrd.fDigis; + ++uPartIdx; + + /// T0F + std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issTof(msgStrTof); + boost::archive::binary_iarchive inputArchiveTof(issTof); + inputArchiveTof >> ts.fData.fTof.fDigis; + ++uPartIdx; + + /// RICH + std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issRich(msgStrRich); + boost::archive::binary_iarchive inputArchiveRich(issRich); + inputArchiveRich >> ts.fData.fRich.fDigis; + ++uPartIdx; + + /// PSD + std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issPsd(msgStrPsd); + boost::archive::binary_iarchive inputArchivePsd(issPsd); + inputArchivePsd >> ts.fData.fPsd.fDigis; + ++uPartIdx; + + LOG(debug) << "T0 Vector size: " << ts.fData.fT0.fDigis.size(); + LOG(debug) << "STS Vector size: " << ts.fData.fSts.fDigis.size(); + LOG(debug) << "MUCH Vector size: " << ts.fData.fMuch.fDigis.size(); + LOG(debug) << "TRD Vector size: " << ts.fData.fTrd.fDigis.size(); + LOG(debug) << "TOF Vector size: " << ts.fData.fTof.fDigis.size(); + LOG(debug) << "RICH Vector size: " << ts.fData.fRich.fDigis.size(); + LOG(debug) << "PSD Vector size: " << ts.fData.fPsd.fDigis.size(); + + const std::vector<double> triggers = GetTriggerTimes(ts); + + /// Create events + std::vector<CbmDigiEvent> vEvents = fEvbuildAlgo(ts, triggers); + + /// Send events vector to ouput + if (!SendEvents(parts, vEvents)) return false; + + return true; +} + +std::vector<double> CbmDeviceEventBuilder::GetTriggerTimes(const CbmDigiTimeslice& ts) +{ + std::vector<double> vDigiTimes; + switch (fTriggerDet) { + case ECbmModuleId::kMuch: { + vDigiTimes = GetDigiTimes(ts.fData.fMuch.fDigis); + break; + } + case ECbmModuleId::kSts: { + vDigiTimes = GetDigiTimes(ts.fData.fSts.fDigis); + break; + } + case ECbmModuleId::kTof: { + vDigiTimes = GetDigiTimes(ts.fData.fTof.fDigis); + break; + } + case ECbmModuleId::kTrd: { + vDigiTimes = GetDigiTimes(ts.fData.fTrd.fDigis); + break; + } + case ECbmModuleId::kRich: { + vDigiTimes = GetDigiTimes(ts.fData.fRich.fDigis); + break; + } + case ECbmModuleId::kPsd: { + vDigiTimes = GetDigiTimes(ts.fData.fPsd.fDigis); + break; + } + case ECbmModuleId::kT0: { + vDigiTimes = GetDigiTimes(ts.fData.fT0.fDigis); + break; + } + default: LOG(fatal) << "CbmDeviceEventBuilder::GetTriggerTimes(): Reading digis from unknown detector type!"; + } + return fTriggerAlgo(vDigiTimes, fTriggerWindow, fMinNumDigis, fDeadTime); +} + +bool CbmDeviceEventBuilder::SendEvents(FairMQParts& partsIn, const std::vector<CbmDigiEvent>& vEvents) +{ + LOG(debug) << "Vector size: " << vEvents.size(); + + /// Serialize the array of events into a single MQ message + /// FIXME: Find out if possible to use only the boost serializer + FairMQMessagePtr message(NewMessage()); + Serialize<RootSerializer>(*message, &(vEvents)); + /* + std::stringstream ossEvt; + boost::archive::binary_oarchive oaEvt(ossEvt); + oaEvt << vOutEvents; + std::string* strMsgEvt = new std::string(ossEvt.str()); +*/ + + /// Add it at the end of the input composed message + /// FIXME: Find out if possible to use only the boost serializer + FairMQParts partsOut(std::move(partsIn)); + partsOut.AddPart(std::move(message)); + /* + partsOut.AddPart(NewMessage( + const_cast<char*>(strMsgEvt->c_str()), // data + strMsgEvt->length(), // size + [](void*, void* object) { delete static_cast<std::string*>(object); }, + strMsgEvt)); // object that manages the data +*/ + if (Send(partsOut, fsChannelNameDataOutput) < 0) { + LOG(error) << "Problem sending data to " << fsChannelNameDataOutput; + return false; + } + return true; +} + + +CbmDeviceEventBuilder::~CbmDeviceEventBuilder() +{ + /// Clear metadata + delete fCbmTsEventHeader; +} diff --git a/MQ/mcbm/CbmDeviceEventBuilder.h b/MQ/mcbm/CbmDeviceEventBuilder.h new file mode 100644 index 0000000000000000000000000000000000000000..2f4328ec4fdf2b2d4b2fdff0179270a8e8321b0c --- /dev/null +++ b/MQ/mcbm/CbmDeviceEventBuilder.h @@ -0,0 +1,113 @@ +/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Dominik Smith[committer] */ + +/** + * CbmDeviceEventBuilder.h + * + * @since 2022-02-01 + * @author D. Smith + */ + +#ifndef CBMDEVICEEVENTBUILDER_H_ +#define CBMDEVICEEVENTBUILDER_H_ + +/// CBM headers +#include "TimeClusterTrigger.h" + +#include "EventBuilder.h" + +/// FAIRROOT headers +#include "FairMQDevice.h" + +/// FAIRSOFT headers (geant, boost, ...) +#include "Rtypes.h" +#include "TObjArray.h" + +/// C/C++ headers +#include <chrono> +#include <vector> + +class CbmTsEventHeader; + +class CbmDeviceEventBuilder : public FairMQDevice { +public: + CbmDeviceEventBuilder(); + virtual ~CbmDeviceEventBuilder(); + +protected: + virtual void InitTask(); + bool HandleData(FairMQParts&, int); + bool HandleCommand(FairMQMessagePtr&, int); + +private: + /// Constants + + /// Control flags + //Bool_t fbIgnoreTsOverlap = kFALSE; //! Ignore data in Overlap part of the TS + //Bool_t fbFillHistos = kTRUE; //! Switch ON/OFF filling of histograms + + /// User settings parameters + /// Algo enum settings + ECbmModuleId fTriggerDet = ECbmModuleId::kT0; + /// message queues + std::string fsChannelNameDataInput = "unpts_0"; + std::string fsChannelNameDataOutput = "events"; + std::string fsChannelNameCommands = "commands"; + std::string fsChannelNameHistosInput = "histogram-in"; + /// Histograms management + uint32_t fuPublishFreqTs = 100; + double_t fdMinPublishTime = 0.5; + double_t fdMaxPublishTime = 5.0; + + /// List of MQ channels names + std::vector<std::string> fsAllowedChannels = {fsChannelNameDataInput}; + + /// Statistics & first TS rejection + uint64_t fulNumMessages = 0; + uint64_t fulTsCounter = 0; + std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); + + /// Processing algos + cbm::algo::TimeClusterTrigger fTriggerAlgo; + cbm::algo::EventBuilder fEvbuildAlgo; + + // Trigger algorithm params + double fTriggerWindow = 0.; + int32_t fMinNumDigis = 0; + double fDeadTime = 0.; + + /// TS MetaData stable values storage + size_t fuNbCoreMsPerTs = 0; //! + size_t fuNbOverMsPerTs = 0; //! + Double_t fdMsSizeInNs = 1280000; //! Size of a single MS, [nanoseconds] + Double_t fdTsCoreSizeInNs = -1.0; //! Total size of the core MS in a TS, [nanoseconds] + Double_t fdTsOverSizeInNs = -1.0; //! Total size of the overlap MS in a TS, [nanoseconds] + Double_t fdTsFullSizeInNs = -1.0; //! Total size of all MS in a TS, [nanoseconds] + + /// Data reception + /// TS information in header + CbmTsEventHeader* fCbmTsEventHeader = nullptr; + + bool IsChannelNameAllowed(std::string channelName); + bool SendEvents(FairMQParts& partsIn, const std::vector<CbmDigiEvent>& vEvents); + + // --- Extract digi times into to a vector + template<class TDigi> + std::vector<double> GetDigiTimes(const std::vector<TDigi>& digiVec) + { + std::vector<double> digiTimes(digiVec.size()); + std::transform(digiVec.begin(), digiVec.end(), digiTimes.begin(), [](const TDigi& digi) { return digi.GetTime(); }); + return digiTimes; + } + + // Get trigger times using trigger algorithm + std::vector<double> GetTriggerTimes(const CbmDigiTimeslice& ts); + + // Get detector type from string containing name + ECbmModuleId GetDetectorId(std::string detName); + + void Finish() {}; +}; + +#endif /* CBMDEVICEEVENTBUILDER_H_ */ diff --git a/MQ/mcbm/runEventBuilder.cxx b/MQ/mcbm/runEventBuilder.cxx new file mode 100644 index 0000000000000000000000000000000000000000..9b5226374c143cb34e5dd4247ed5c5cf35638bde --- /dev/null +++ b/MQ/mcbm/runEventBuilder.cxx @@ -0,0 +1,48 @@ +/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Dominik Smith [committer] */ + +#include "CbmDeviceEventBuilder.h" + +#include <iomanip> +#include <string> + +#include "runFairMQDevice.h" + +namespace bpo = boost::program_options; +using namespace std; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options()("TriggerWin", bpo::value<double>()->default_value(0.0), "Time window for trigger algorithm"); + options.add_options()("TriggerMinDigis", bpo::value<int32_t>()->default_value(1), + "Minimum digi count for trigger algorithm"); + options.add_options()("TriggerDeadTime", bpo::value<double>()->default_value(0.0), "Dead time for trigger algorithm"); + options.add_options()("FillHistos", bpo::value<bool>()->default_value(true), + "Fill histograms and send them to histo server if true"); + options.add_options()("IgnTsOver", bpo::value<bool>()->default_value(false), "Ignore TS overlap if true"); + options.add_options()("TriggerDet", bpo::value<std::string>()->default_value("kT0"), + "Set the trigger detector, use string matching an ECbmModuleId "); + options.add_options()("AddDet", bpo::value<std::vector<std::string>>()->multitoken()->composing(), + "Add a detector for digis selection, use string matching an ECbmModuleId "); + options.add_options()("SetEvbuildWin", bpo::value<std::vector<std::string>>()->multitoken()->composing(), + "Set event builder window for selected detector, use string matching " + "ECbmModuleId,dWinBeg,dWinEnd e.g. kSts,-10.5,100.0"); + options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("unpts_0"), + "MQ channel name for unpacked TS data"); + options.add_options()("EvtNameOut", bpo::value<std::string>()->default_value("events"), + "MQ channel name for built events"); + options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), + "MQ channel name for histos"); + options.add_options()("ChNameHistCfg", bpo::value<std::string>()->default_value("histo-conf"), + "MQ channel name for histos config"); + options.add_options()("ChNameCanvCfg", bpo::value<std::string>()->default_value("canvas-conf"), + "MQ channel name for canvases config"); + options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS"); + options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), + "Minimal time between two publishing"); + options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), + "Maximal time between two publishing"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceEventBuilder(); } diff --git a/MQ/mcbm/startEventBuilder.sh.in b/MQ/mcbm/startEventBuilder.sh.in new file mode 100755 index 0000000000000000000000000000000000000000..d2c3936d39e88ac64235e4ee0d0954046377a38b --- /dev/null +++ b/MQ/mcbm/startEventBuilder.sh.in @@ -0,0 +1,249 @@ +#!/bin/bash +$SIMPATH/bin/fairmq-shmmonitor --cleanup + +if [ $# -ge 1 ]; then + _nbmoni=$1 + ((_pubfreqts = $_nbmoni*100 )) + _pubminsec=1.0 + _pubmaxsec=10.0 + + if [ $# -ge 4 ]; then + _filename="" + _dirname="" + _hostname=$4 + + if [ $# -ge 5 ]; then + _pubfreqts=$5 + + if [ $# -ge 6 ]; then + _pubminsec=$6 + + if [ $# -ge 7 ]; then + _pubmaxsec=$7 + fi + fi + fi + elif [ $# -ge 2 ]; then + _filename=$2 + _hostname="" + if [ $# -eq 3 ]; then + _dirname=$3 + else + _dirname="" + fi + else + echo 'Starting connection to local stream' + echo ' for other usages, please supply at least a filename.' + echo 'Possible usages are:' + echo 'startMQMcbmPulserMonitor2020.sh' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> <full filename pattern list>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> <filename pattern> <folder_path>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>' + _filename="" + _dirname="" + _hostname="localhost" + fi +else + echo 'Starting connection to local stream with 1 monitor process' + echo ' for other usages, please supply at least a filename.' + echo 'Possible usages are:' + echo 'startMQMcbmPulserMonitor2020.sh' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> <full filename pattern list>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> <filename pattern> <folder_path>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>' + _filename="" + _dirname="" + _hostname="localhost" + _nbmoni=1 + _pubfreqts=100 + _pubminsec=1.0 + _pubmaxsec=10.0 +fi + +_parfileSts=$VMCWORKDIR/macro/beamtime/mcbm2021/mStsPar.par +_parfileMuch=$VMCWORKDIR/macro/beamtime/mcbm2021/mMuchPar.par +_parfileTrdAsic=$VMCWORKDIR/parameters/trd/trd_v21b_mcbm.asic.par +_parfileTrdDigi=$VMCWORKDIR/parameters/trd/trd_v21b_mcbm.digi.par +_parfileTrdGas=$VMCWORKDIR/parameters/trd/trd_v21b_mcbm.gas.par +_parfileTrdGain=$VMCWORKDIR/parameters/trd/trd_v21b_mcbm.gain.par +_parfileTof=$VMCWORKDIR/macro/beamtime/mcbm2021/mTofCriPar.par +_parfileRich=$VMCWORKDIR/macro/beamtime/mcbm2021/mRichPar_70.par +_parfilePsd=$VMCWORKDIR/macro/beamtime/mcbm2021/mPsdPar.par +_setup_name=mcbm_beam_2021_07_surveyed +_run_id=1588 + +LOGFILETAG=`hostname` +LOGFILETAG+="_" +LOGFILETAG+=`date +%Y_%m_%d_%H_%M_%S` +LOGFILETAG+=".log" + +(( _paraBuffSz=100 )) +(( _singBuffSz=_paraBuffSz*_nbmoni )) + +echo "Buffer size for parallel devices $_paraBuffSz" +echo "Buffer size for singleton devices $_singBuffSz" + +SAMPLER="RepReqTsSampler" +SAMPLER+=" --id sampler1" +#SAMPLER+=" --max-timeslices 0" +#SAMPLER+=" --max-timeslices 10" +#SAMPLER+=" --max-timeslices 100" +SAMPLER+=" --max-timeslices 300" +#SAMPLER+=" --max-timeslices 1000" +SAMPLER+=" --severity info" +#SAMPLER+=" --flib-port 10" +if [ "$_hostname" != "" ]; then + SAMPLER+=" --fles-host $_hostname" +elif [ "$_filename" != "" ]; then + SAMPLER+=" --filename $_filename" + if [ "$_dirname" != "" ]; then + SAMPLER+=" --dirname $_dirname" + fi +fi +SAMPLER+=" --high-water-mark 1000" +SAMPLER+=" --no-split-ts 1" +SAMPLER+=" --ChNameMissTs missedts" +SAMPLER+=" --ChNameCmds commands" +SAMPLER+=" --PubFreqTs $_pubfreqts" +SAMPLER+=" --PubTimeMin $_pubminsec" +SAMPLER+=" --PubTimeMax $_pubmaxsec" +SAMPLER+=" --channel-config name=ts-request,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11555" +SAMPLER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666" +SAMPLER+=" --channel-config name=missedts,type=pub,method=bind,address=tcp://127.0.0.1:11006" +SAMPLER+=" --channel-config name=commands,type=pub,method=bind,address=tcp://127.0.0.1:11007" +SAMPLER+=" --transport zeromq" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +SAMPLER_LOG="sampler1_$LOGFILETAG" +xterm -l -lf $SAMPLER_LOG -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER & + +echo $SAMPLER + +_iMoni=0 +while (( _iMoni < _nbmoni )); do + (( _yOffset=200*_iMoni )) + (( _iMoni += 1 )) + (( _iPort = 11680 + _iMoni )) + + UNPACKER="MqUnpack" + UNPACKER+=" --id unp$_iMoni" + UNPACKER+=" --severity info" + #UNPACKER+=" --severity debug" + UNPACKER+=" --Setup $_setup_name" + UNPACKER+=" --RunId $_run_id" + UNPACKER+=" --IgnOverMs 1" + UNPACKER+=" --SetTimeOffs kSTS,-2221" + UNPACKER+=" --SetTimeOffs kMUCH,-885" + UNPACKER+=" --SetTimeOffs kTRD,0" + UNPACKER+=" --SetTimeOffs kTRD2D,-1800" + UNPACKER+=" --SetTimeOffs kTOF,-1220" + UNPACKER+=" --SetTimeOffs kRICH,254800" + UNPACKER+=" --SetTimeOffs kPSD,0" + UNPACKER+=" --PubFreqTs $_pubfreqts" + UNPACKER+=" --PubTimeMin $_pubminsec" + UNPACKER+=" --PubTimeMax $_pubmaxsec" + UNPACKER+=" --TsNameOut unpts$_iMoni" + UNPACKER+=" --channel-config name=ts-request,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11555" + UNPACKER+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" + UNPACKER+=" --channel-config name=unpts$_iMoni,type=push,method=bind,transport=zeromq,address=tcp://127.0.0.1:$_iPort" +# UNPACKER+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" + UNPACKER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666" + UNPACKER+=" --transport zeromq" + # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX + # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log + UNPACKER_LOG="unp$_iMoni" + UNPACKER_LOG+="_$LOGFILETAG" + xterm -l -lf $UNPACKER_LOG -geometry 132x23+400+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/mcbm/$UNPACKER & + + EVTBUILDER="EventBuilder" + EVTBUILDER+=" --id build$_iMoni" + EVTBUILDER+=" --severity info" + #EVTBUILDER+=" --severity debug" + EVTBUILDER+=" --PubFreqTs $_pubfreqts" + EVTBUILDER+=" --PubTimeMin $_pubminsec" + EVTBUILDER+=" --PubTimeMax $_pubmaxsec" + EVTBUILDER+=" --FillHistos true" + EVTBUILDER+=" --IgnTsOver false" + EVTBUILDER+=" --TriggerDet kTof" + EVTBUILDER+=" --TriggerWin 0.0" + EVTBUILDER+=" --TriggerMinDigis 1" + EVTBUILDER+=" --TriggerDeadTime 0.0" + EVTBUILDER+=" --AddDet kSts" + EVTBUILDER+=" --AddDet kTrd" + EVTBUILDER+=" --AddDet kTof" + EVTBUILDER+=" --AddDet kRich" + EVTBUILDER+=" --AddDet kPsd" + EVTBUILDER+=" --SetEvbuildWin kSts,-100,100" + EVTBUILDER+=" --SetEvbuildWin kTrd,-250,250" + EVTBUILDER+=" --SetEvbuildWin kTof,-150,150" # To get T0 Digis (seed + close-by digis) in the event + EVTBUILDER+=" --SetEvbuildWin kRich,-100,100" + EVTBUILDER+=" --SetEvbuildWin kPsd,-100,100" + EVTBUILDER+=" --TsNameIn unpts$_iMoni" + EVTBUILDER+=" --EvtNameOut events" + EVTBUILDER+=" --channel-config name=unpts$_iMoni,type=pull,method=connect,transport=zeromq,address=tcp://127.0.0.1:$_iPort" + EVTBUILDER+=" --channel-config name=events,type=push,method=connect,transport=zeromq,address=tcp://127.0.0.1:11556" +# EVTBUILDER+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" + EVTBUILDER+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" + EVTBUILDER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666" + EVTBUILDER+=" --transport zeromq" + # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX + # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log + EVTBUILDER_LOG="build$_iMoni" + EVTBUILDER_LOG+="_$LOGFILETAG" + xterm -l -lf $EVTBUILDER_LOG -geometry 80x23+800+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/mcbm/$EVTBUILDER & + +done + +EVTSINK="DigiEventSink" +EVTSINK+=" --id evtsink1" +EVTSINK+=" --severity info" +#EVTSINK+=" --severity debug" +#EVTSINK+=" --StoreFullTs 1" +EVTSINK+=" --OutFileName mcbm_digis_events.root" +EVTSINK+=" --FillHistos false" +EVTSINK+=" --PubFreqTs $_pubfreqts" +EVTSINK+=" --PubTimeMin $_pubminsec" +EVTSINK+=" --PubTimeMax $_pubmaxsec" +EVTSINK+=" --EvtNameIn events" +EVTSINK+=" --channel-config name=events,type=pull,method=bind,transport=zeromq,address=tcp://127.0.0.1:11556" +EVTSINK+=" --channel-config name=missedts,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11006" +EVTSINK+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" +EVTSINK+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +EVTSINK_LOG="evtsink1_$LOGFILETAG" +xterm -l -lf $EVTSINK_LOG -geometry 80x23+1200+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/mcbm/$EVTSINK & + +PARAMETERSERVER="parmq-server" +PARAMETERSERVER+=" --id parmq-server" +PARAMETERSERVER+=" --severity info" +PARAMETERSERVER+=" --channel-name parameters" +PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" +PARAMETERSERVER+=" --first-input-name $_parfileSts;$_parfileMuch;$_parfileTrdAsic;$_parfileTrdDigi;$_parfileTrdGas;$_parfileTrdGain;$_parfileTof;$_parfileRich;$_parfilePsd" +PARAMETERSERVER+=" --first-input-type ASCII" +PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem +PARAMETERSERVER+=" --setup $_setup_name" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +PARAMSRV_LOG="parmq_$LOGFILETAG" +xterm -l -lf $PARAMSRV_LOG -geometry 80x23+1600+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER & + +HISTSERVER="MqHistoServer" +HISTSERVER+=" --id server1" +HISTSERVER+=" --severity info" +HISTSERVER+=" --histport 8081" +HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666" +HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667,rateLogging=0" +HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668,rateLogging=0" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +HISTSRV_LOG="server1_$LOGFILETAG" +xterm -l -lf $HISTSRV_LOG -geometry 80x23+2000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &