diff --git a/reco/CMakeLists.txt b/reco/CMakeLists.txt index f17376633c84b1953d2b250a88d7861c04a04f8a..a2b6cd17bc43e259e4b7113b2aef7d157a7c35a0 100644 --- a/reco/CMakeLists.txt +++ b/reco/CMakeLists.txt @@ -14,3 +14,4 @@ add_subdirectory(tracking) add_subdirectory(qa) add_subdirectory (tasks) add_subdirectory (app) +add_subdirectory (mq) diff --git a/reco/mq/CMakeLists.txt b/reco/mq/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..1fe076d491184e784bc527d456d1486630c13a02 --- /dev/null +++ b/reco/mq/CMakeLists.txt @@ -0,0 +1,120 @@ +If(FairSoft_VERSION VERSION_LESS 18.6.0) + Add_Definitions(-DHAVE_FAIRMQSTATEMACHINE) +EndIf() + +Set(FAIRMQ_LIBS FairMQStateMachine FairMQ) + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startUnpack.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startUnpack.sh) + +set(INCLUDE_DIRECTORIES + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/MQ/base + ${CBMROOT_SOURCE_DIR}/algo + ${CBMROOT_SOURCE_DIR}/algo/evbuild + ${CBMROOT_SOURCE_DIR}/algo/trigger + ${CBMROOT_SOURCE_DIR}/algo/detectors/sts + ${CBMROOT_SOURCE_DIR}/fles/flestools + ${CBMROOT_SOURCE_DIR}/reco/mq/ + ${CBMDATA_DIR} + ${CBMDATA_DIR}/raw + ${CBMDATA_DIR}/sts + ${CBMDATA_DIR}/much + ${CBMDATA_DIR}/rich + ${CBMDATA_DIR}/tof + ${CBMDATA_DIR}/psd + ${CBMDATA_DIR}/trd + ${CBMDATA_DIR}/mvd # Feint to avoid crash of DigiManager due to missing source pointer + ${CBMDATA_DIR}/base + ${CBMDATA_DIR}/global +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${SYSTEM_INCLUDE_DIRECTORIES} + ${ZeroMQ_INCLUDE_DIR} + ${Boost_INCLUDE_DIR} + ${FAIRROOT_INCLUDE_DIR} + ${FAIRMQ_INCLUDE_DIR} + ${FAIRMQ_INCLUDE_DIR}/options + ${FAIRLOGGER_INCLUDE_DIR} + + ${IPC_INCLUDE_DIRECTORY} + ${CBMROOT_SOURCE_DIR}/external/cppzmq +) + +include_directories(${INCLUDE_DIRECTORIES}) +include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +set(LINK_DIRECTORIES + ${KFParticle_LIB_DIR} + ${FAIRMQ_LIBRARY_DIR} + ${FAIRROOT_LIBRARY_DIR} + ${ROOT_LIBRARY_DIR} + ${Boost_LIBRARY_DIRS} +) + +link_directories(${LINK_DIRECTORIES}) + +# Set the install path within the build directory +set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/mq") +# Set the install path within the installation directory +set(BIN_DESTINATION bin/mq) + +Set(BOOST_LIBS + ${Boost_SYSTEM_LIBRARY} + ${Boost_SERIALIZATION_LIBRARY} + ${Boost_PROGRAM_OPTIONS_LIBRARY} + ${Boost_LOG_LIBRARY} +) +If(UNIX AND NOT APPLE) + List(APPEND BOOST_LIBS pthread) +EndIf() + +set(FAIR_LIBS + ${FAIRMQ_LIBS} +) + +If(FAIRLOGGER_FOUND) + set(FAIR_LIBS + ${FAIR_LIBS} + FairLogger + ) +EndIf() + +set(EXE_NAME MqDevUnpack) +set(SRCS CbmDevUnpack.cxx runUnpack.cxx) + +set(DEPENDENCIES + ${DEPENDENCIES} + ${FAIR_LIBS} + ${BOOST_LIBS} + fles_ipc + CbmFlibFlesTools + Algo + CbmBase + CbmRecoBase + CbmPsdReco + CbmRichReco + CbmRecoSts + CbmTofReco + CbmTrdReco + CbmData + CbmSimSteer # for CbmSetup! + Core +) +GENERATE_EXECUTABLE() + +# Set the correct variables for the installation +set(VMCWORKDIR ${CMAKE_INSTALL_PREFIX}/share/cbmroot) + +set(MY_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}) +set(CMAKE_CURRENT_SOURCE_DIR ${VMCWORKDIR}/input) + +set(TMPDIR "${CMAKE_BINARY_DIR}") +set(CMAKE_BINARY_DIR ${CMAKE_INSTALL_PREFIX}) + +# Configure file for installation directory +configure_file(${MY_SOURCE_DIR}/startUnpack.sh.in ${TMPDIR}/bin/MQ/topologies/install/startUnpack.sh) + +install(PROGRAMS ${TMPDIR}/bin/MQ/topologies/install/startUnpack.sh + DESTINATION ${CMAKE_INSTALL_PREFIX}/bin/MQ/topologies + ) diff --git a/reco/mq/CbmDevUnpack.cxx b/reco/mq/CbmDevUnpack.cxx new file mode 100644 index 0000000000000000000000000000000000000000..6f08b19f7dda4fb94d135952d45c3d61709f9e54 --- /dev/null +++ b/reco/mq/CbmDevUnpack.cxx @@ -0,0 +1,305 @@ +/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau, Dominik Smith [committer] */ + +/** + * CbmDevUnpack.cxx + * + * @since 2020-05-04 + * @author P.-A. Loizeau + */ + +#include "CbmDevUnpack.h" + +#include "CbmDigiTimeslice.h" +#include "CbmMQDefs.h" + +#include "StorableTimeslice.hpp" +#include "TimesliceMetaData.h" + +#include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig +#include "FairParGenericSet.h" + +#include "TStopwatch.h" + +#include "BoostSerializer.h" +#include <boost/archive/binary_iarchive.hpp> +#include <boost/serialization/utility.hpp> + +#include <array> +#include <iomanip> +#include <stdexcept> +#include <string> +#include <utility> + +#include "RootSerializer.h" +struct InitTaskError : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +using namespace std; +using cbm::algo::UnpackStsElinkPar; +using cbm::algo::UnpackStsPar; + +CbmDevUnpack::CbmDevUnpack() {} + +void CbmDevUnpack::InitTask() +try { + /// Read options from executable + LOG(info) << "Init options for CbmDevUnpack."; + fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); + fsChannelNameDataOutput = fConfig->GetValue<std::string>("TsNameOut"); +} +catch (InitTaskError& e) { + LOG(error) << e.what(); + // Wrapper defined in CbmMQDefs.h to support different FairMQ versions + cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); +} + +Bool_t CbmDevUnpack::InitAlgos() +{ + /// Event header object + fCbmTsEventHeader = new CbmTsEventHeader(); + + // --- Common parameters for all components + uint32_t numChansPerAsic = 128; // R/O channels per ASIC + uint32_t numAsicsPerModule = 16; // Number of ASICs per module + + // Create one algorithm per component and configure it with parameters + auto equipIds = fStsConfig.GetEquipmentIds(); + for (auto& equip : equipIds) { + std::unique_ptr<UnpackStsPar> par(new UnpackStsPar()); + par->fNumChansPerAsic = numChansPerAsic; + par->fNumAsicsPerModule = numAsicsPerModule; + const size_t numElinks = fStsConfig.GetNumElinks(equip); + for (size_t elink = 0; elink < numElinks; elink++) { + UnpackStsElinkPar elinkPar; + auto mapEntry = fStsConfig.Map(equip, elink); + elinkPar.fAddress = mapEntry.first; // Module address for this elink + elinkPar.fAsicNr = mapEntry.second; // ASIC number within module + elinkPar.fTimeOffset = 0.; + elinkPar.fAdcOffset = 0.; + elinkPar.fAdcGain = 0.; + // TODO: Add parameters for time and ADC calibration + par->fElinkParams.push_back(elinkPar); + } + fAlgoSts[equip].SetParams(std::move(par)); + LOG(info) << "--- Configured equipment " << equip << " with " << numElinks << " elinks"; + } //# equipments + + LOG(info) << "--- Configured " << fAlgoSts.size() << " unpacker algorithms for STS."; + LOG(debug) << "Readout map:" << fStsConfig.PrintReadoutMap(); + LOG(info) << "=================================================="; + std::cout << std::endl; + + return true; +} + + +// Method called by run loop and requesting new data from the TS source whenever +bool CbmDevUnpack::ConditionalRun() +{ + /// First request a new TS (full one) + std::string message = "full"; + LOG(debug) << "Requesting new TS by sending message: full" << message; + FairMQMessagePtr req(NewSimpleMessage(message)); + FairMQMessagePtr rep(NewMessage()); + + if (Send(req, fsChannelNameDataInput) <= 0) { + LOG(error) << "Failed to send the request! message was " << message; + return false; + } + else if (Receive(rep, fsChannelNameDataInput) < 0) { + LOG(error) << "Failed to receive a reply to the request! message was " << message; + return false; + } + else if (rep->GetSize() == 0) { + LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message; + return false; + } + + /// Message received, do Algo related Initialization steps if needed + if (0 == fNumMessages) { InitAlgos(); } + + fNumMessages++; + LOG(debug) << "Received message number " << fNumMessages << " with size " << rep->GetSize(); + + if (0 == fNumMessages % 10000) LOG(info) << "Received " << fNumMessages << " messages"; + + std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize()); + std::istringstream iss(msgStr); + boost::archive::binary_iarchive inputArchive(iss); + + /// Create an empty TS and fill it with the incoming message + fles::StorableTimeslice ts {0}; + inputArchive >> ts; + + /// Extract the TS parameters from header (by definition stable over time) + const size_t NbCoreMsPerTs = ts.num_core_microslices(); + const size_t NbOverMsPerTs = ts.num_microslices(0) - ts.num_core_microslices(); + const double MsSizeInNs = (ts.descriptor(0, NbCoreMsPerTs).idx - ts.descriptor(0, 0).idx) / NbCoreMsPerTs; + const double TsCoreSizeInNs = MsSizeInNs * (NbCoreMsPerTs); + const double TsOverSizeInNs = MsSizeInNs * (NbOverMsPerTs); + const double TsFullSizeInNs = TsCoreSizeInNs + TsOverSizeInNs; + const TimesliceMetaData TsMetaData(ts.start_time(), TsCoreSizeInNs, TsOverSizeInNs, ts.index()); + + if (0 == fNumTs) { + LOG(info) << "Timeslice parameters: each TS has " << NbCoreMsPerTs << " Core MS and " << NbOverMsPerTs + << " Overlap MS, for a MS duration of " << MsSizeInNs << " ns, a core duration of " << TsCoreSizeInNs + << " ns and a full duration of " << TsFullSizeInNs << " ns"; + } + + /// Process the timeslice + CbmDigiTimeslice digiTs = DoUnpack(ts); + + LOG(debug) << "Unpack: Sending TS index " << ts.index(); + /// Send digi vectors to ouput + if (!SendData(digiTs, TsMetaData)) return false; + LOG(debug) << "Unpack: Sent TS index " << ts.index(); + + // Reset the event header for a new timeslice + fCbmTsEventHeader->Reset(); + + return true; +} + +bool CbmDevUnpack::SendData(const CbmDigiTimeslice& timeslice, const TimesliceMetaData& TsMetaData) +{ + FairMQParts parts; + + /// Prepare serialized versions of the TS Event header + FairMQMessagePtr messTsHeader(NewMessage()); + RootSerializer().Serialize(*messTsHeader, fCbmTsEventHeader); + parts.AddPart(std::move(messTsHeader)); + + /// Prepare serialized version of Digi Timeslice + std::stringstream ossTS; + boost::archive::binary_oarchive oaTS(ossTS); + oaTS << timeslice; + + std::string* strMsgTS = new std::string(ossTS.str()); + + parts.AddPart(NewMessage( + const_cast<char*>(strMsgTS->c_str()), // data + strMsgTS->length(), // size + [](void*, void* object) { delete static_cast<std::string*>(object); }, + strMsgTS)); // object that manages the data + + /// Prepare serialized versions of the TS Meta + /// FIXME: only for TS duration and overlap, should be sent to parameter service instead as stable values in run + /// Index and start time are already included in the TsHeader object! + FairMQMessagePtr messTsMeta(NewMessage()); + RootSerializer().Serialize(*messTsMeta, &TsMetaData); + parts.AddPart(std::move(messTsMeta)); + + if (Send(parts, fsChannelNameDataOutput) < 0) { + LOG(error) << "Problem sending data to " << fsChannelNameDataOutput; + return false; + } + + return true; +} + +CbmDevUnpack::~CbmDevUnpack() +{ + if (fCbmTsEventHeader) { delete fCbmTsEventHeader; } +} + +CbmDigiTimeslice CbmDevUnpack::DoUnpack(const fles::Timeslice& timeslice) +{ + fNumTs++; + + fCbmTsEventHeader->SetTsIndex(timeslice.index()); + fCbmTsEventHeader->SetTsStartTime(timeslice.start_time()); + + // Output digi timeslice + CbmDigiTimeslice digiTs; + + // --- Timer + TStopwatch timer; + TStopwatch compTimer; + timer.Start(); + + // --- Timeslice properties + const uint64_t tsIndex = timeslice.index(); + const uint64_t tsTime = timeslice.start_time(); + const uint64_t numComp = timeslice.num_components(); + uint64_t numCompUsed = 0; + + // --- Counters + size_t numMs = 0; + size_t numBytes = 0; + size_t numDigis = 0; + + // --- Component loop + for (uint64_t comp = 0; comp < numComp; comp++) { + + auto systemId = static_cast<fles::SubsystemIdentifier>(timeslice.descriptor(comp, 0).sys_id); + if (systemId == fles::SubsystemIdentifier::STS) { + const uint16_t equipmentId = timeslice.descriptor(comp, 0).eq_id; + const auto algoIt = fAlgoSts.find(equipmentId); + assert(algoIt != fAlgoSts.end()); + + // The current algorithm works for the STS data format version 0x20 used in 2021. + // Other versions are not yet supported. + // In the future, different data formats will be supported by instantiating different + // algorithms depending on the version. + assert(timeslice.descriptor(comp, 0).sys_ver == 0x20); + + // --- Component log + size_t numBytesInComp = 0; + size_t numDigisInComp = 0; + compTimer.Start(); + + // --- Microslice loop + uint64_t numMsInComp = timeslice.num_microslices(comp); + for (uint64_t mslice = 0; mslice < numMsInComp; mslice++) { + const auto msDescriptor = timeslice.descriptor(comp, mslice); + const auto msContent = timeslice.content(comp, mslice); + numBytesInComp += msDescriptor.size; + auto result = (algoIt->second)(msContent, msDescriptor, tsTime); + LOG(debug1) << "CbmDevUnpack::DoUnpack(): Component " << comp << ", microslice " << mslice << ", digis " + << result.first.size() << ", errors " << result.second.fNumNonHitOrTsbMessage << " | " + << result.second.fNumErrElinkOutOfRange << " | " << result.second.fNumErrInvalidFirstMessage + << " | " << result.second.fNumErrInvalidMsSize << " | " << result.second.fNumErrTimestampOverflow + << " | "; + const auto it = digiTs.fData.fSts.fDigis.end(); + digiTs.fData.fSts.fDigis.insert(it, result.first.begin(), result.first.end()); + numDigisInComp += result.first.size(); + } + + compTimer.Stop(); + LOG(debug) << "CbmDevUnpack::DoUnpack(): Component " << comp << ", microslices " << numMsInComp << " input size " + << numBytesInComp << " bytes, " + << ", digis " << numDigisInComp << ", CPU time " << compTimer.CpuTime() * 1000. << " ms"; + numCompUsed++; + numBytes += numBytesInComp; + numDigis += numDigisInComp; + numMs += numMsInComp; + } + + } //# component + + // --- Add Sts Digis to header + fCbmTsEventHeader->AddNDigisSts(numDigis); + + // --- Sorting of output digis. Is required by both digi trigger and event builder. + std::sort(digiTs.fData.fSts.fDigis.begin(), digiTs.fData.fSts.fDigis.end(), + [](CbmStsDigi digi1, CbmStsDigi digi2) { return digi1.GetTime() < digi2.GetTime(); }); + + // --- Timeslice log + timer.Stop(); + stringstream logOut; + logOut << setw(15) << left << "CbmDevUnpack::DoUnpackGetName(): ["; + logOut << fixed << setw(8) << setprecision(1) << right << timer.RealTime() * 1000. << " ms] "; + logOut << "TS " << fNumTs << " (index " << tsIndex << ")"; + logOut << ", components " << numCompUsed << " / " << numComp << ", microslices " << numMs; + logOut << ", input rate " << double(numBytes) / timer.RealTime() / 1.e6 << " MB/s"; + logOut << ", digis " << numDigis; + LOG(debug) << logOut.str(); + + if (0 == fNumTs % 10000) LOG(info) << "Processed " << fNumTs << " time slices"; + + return digiTs; +} diff --git a/reco/mq/CbmDevUnpack.h b/reco/mq/CbmDevUnpack.h new file mode 100644 index 0000000000000000000000000000000000000000..ef273d4bdf2f0c0eae5f5ee2d5246358d19a86d9 --- /dev/null +++ b/reco/mq/CbmDevUnpack.h @@ -0,0 +1,76 @@ +/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau, Dominik Smith [committer] */ + +/** + * CbmDevUnpack.h + * + * @since 2020-05-04 + * @author P.-A. Loizeau + */ + +#ifndef CBMDEVUNPACK_H +#define CBMDEVUNPACK_H + +#include "CbmMqTMessage.h" +#include "CbmTsEventHeader.h" + +#include "Timeslice.hpp" + +#include "FairMQDevice.h" +#include "FairParGenericSet.h" + +#include "Rtypes.h" +#include "TObjArray.h" + +#include <chrono> +#include <map> +#include <vector> + +#include "StsReadoutConfig.h" +#include "UnpackSts.h" + +class TimesliceMetaData; +struct CbmDigiTimeslice; + +class CbmDevUnpack : public FairMQDevice { +public: + CbmDevUnpack(); + virtual ~CbmDevUnpack(); + +private: + std::map<uint16_t, cbm::algo::UnpackSts> fAlgoSts = {}; + cbm::algo::StsReadoutConfig fStsConfig {}; + + /// message queues + std::string fsChannelNameDataInput = "ts-request"; + std::string fsChannelNameDataOutput = "unpts_0"; + std::string fsChannelNameCommands = "commands"; + + /// Statistics & first TS rejection + size_t fNumMessages = 0; + size_t fNumTs = 0; + + /// Pointer to the Timeslice header conatining start time and index + CbmTsEventHeader* fCbmTsEventHeader = nullptr; + + /** @brief Read command line parameters for MQ device */ + virtual void InitTask(); + + /** @brief Called by run loop, does init steps on first TS */ + bool ConditionalRun(); + + /** @brief Initialize runtime parameters for UnpackSts algos */ + bool InitAlgos(); + + /** + * @brief Unpack a single timeslice + * @param ts Input FLES timeslice + */ + CbmDigiTimeslice DoUnpack(const fles::Timeslice& ts); + + /** @brief Serialize unpacked digi timeslice and send to output channel */ + bool SendData(const CbmDigiTimeslice& timeslice, const TimesliceMetaData& TsMetaData); +}; + +#endif /* CBMDEVUNPACK_H */ diff --git a/reco/mq/runUnpack.cxx b/reco/mq/runUnpack.cxx new file mode 100644 index 0000000000000000000000000000000000000000..8116b39f51a44e493de079a39d8f419a402a1774 --- /dev/null +++ b/reco/mq/runUnpack.cxx @@ -0,0 +1,23 @@ +/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau, Dominik Smith [committer] */ + +#include "CbmDevUnpack.h" + +#include <iomanip> +#include <string> + +#include "runFairMQDevice.h" + +namespace bpo = boost::program_options; +using namespace std; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"), + "MQ channel name for raw TS data"); + options.add_options()("TsNameOut", bpo::value<std::string>()->default_value("unpts_0"), + "MQ channel name for unpacked TS data"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDevUnpack(); } diff --git a/reco/mq/startUnpack.sh.in b/reco/mq/startUnpack.sh.in new file mode 100755 index 0000000000000000000000000000000000000000..37b772b9d971854bcdc1f569531fc108bd976b27 --- /dev/null +++ b/reco/mq/startUnpack.sh.in @@ -0,0 +1,197 @@ +#!/bin/bash + +if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then + @SIMPATH@/bin/fairmq-shmmonitor --cleanup +fi + +if [ $# -ge 1 ]; then + _nbmoni=$1 + ((_pubfreqts = $_nbmoni*100 )) + _pubminsec=1.0 + _pubmaxsec=10.0 + + if [ $# -ge 4 ]; then + _filename="" + _dirname="" + _hostname=$4 + + if [ $# -ge 5 ]; then + _pubfreqts=$5 + + if [ $# -ge 6 ]; then + _pubminsec=$6 + + if [ $# -ge 7 ]; then + _pubmaxsec=$7 + fi + fi + fi + elif [ $# -ge 2 ]; then + _filename=$2 + _hostname="" + if [ $# -eq 3 ]; then + _dirname=$3 + else + _dirname="" + fi + else + echo 'Starting connection to local stream' + echo ' for other usages, please supply at least a filename.' + echo 'Possible usages are:' + echo 'startMQMcbmPulserMonitor2020.sh' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> <full filename pattern list>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> <filename pattern> <folder_path>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>' + _filename="" + _dirname="" + _hostname="localhost" + fi +else + echo 'Starting connection to local stream with 1 monitor process' + echo ' for other usages, please supply at least a filename.' + echo 'Possible usages are:' + echo 'startMQMcbmPulserMonitor2020.sh' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> <full filename pattern list>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> <filename pattern> <folder_path>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>' + echo 'startMQMcbmPulserMonitor2020.sh <Nb Unp & Moni processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>' + _filename="" + _dirname="" + _hostname="localhost" + _nbmoni=1 + _pubfreqts=100 + _pubminsec=1.0 + _pubmaxsec=10.0 +fi + +_parfileSts=@VMCWORKDIR@/macro/beamtime/mcbm2021/mStsPar.par +_parfileMuch=@VMCWORKDIR@/macro/beamtime/mcbm2021/mMuchPar.par +_parfileTrdAsic=@VMCWORKDIR@/parameters/trd/trd_v21b_mcbm.asic.par +_parfileTrdDigi=@VMCWORKDIR@/parameters/trd/trd_v21b_mcbm.digi.par +_parfileTrdGas=@VMCWORKDIR@/parameters/trd/trd_v21b_mcbm.gas.par +_parfileTrdGain=@VMCWORKDIR@/parameters/trd/trd_v21b_mcbm.gain.par +_parfileTof=@VMCWORKDIR@/macro/beamtime/mcbm2021/mTofCriPar.par +_parfileRich=@VMCWORKDIR@/macro/beamtime/mcbm2021/mRichPar_70.par +_parfilePsd=@VMCWORKDIR@/macro/beamtime/mcbm2021/mPsdPar.par +_setup_name=mcbm_beam_2021_07_surveyed +_run_id=1588 + +LOGFILETAG=`hostname` +LOGFILETAG+="_" +LOGFILETAG+=`date +%Y_%m_%d_%H_%M_%S` +LOGFILETAG+=".log" + +(( _paraBuffSz=100 )) +(( _singBuffSz=_paraBuffSz*_nbmoni )) + +echo "Buffer size for parallel devices $_paraBuffSz" +echo "Buffer size for singleton devices $_singBuffSz" + +SAMPLER="RepReqTsSampler" +SAMPLER+=" --id sampler1" +#SAMPLER+=" --max-timeslices 0" +#SAMPLER+=" --max-timeslices 10" +#SAMPLER+=" --max-timeslices 100" +SAMPLER+=" --max-timeslices 300" +#SAMPLER+=" --max-timeslices 1000" +SAMPLER+=" --severity info" +#SAMPLER+=" --flib-port 10" +if [ "$_hostname" != "" ]; then + SAMPLER+=" --fles-host $_hostname" +elif [ "$_filename" != "" ]; then + SAMPLER+=" --filename $_filename" + if [ "$_dirname" != "" ]; then + SAMPLER+=" --dirname $_dirname" + fi +fi +SAMPLER+=" --high-water-mark 1000" +SAMPLER+=" --no-split-ts 1" +SAMPLER+=" --ChNameMissTs missedts" +SAMPLER+=" --ChNameCmds commands" +SAMPLER+=" --PubFreqTs $_pubfreqts" +SAMPLER+=" --PubTimeMin $_pubminsec" +SAMPLER+=" --PubTimeMax $_pubmaxsec" +SAMPLER+=" --channel-config name=ts-request,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11555" +SAMPLER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666" +SAMPLER+=" --channel-config name=missedts,type=pub,method=bind,address=tcp://127.0.0.1:11006" +SAMPLER+=" --channel-config name=commands,type=pub,method=bind,address=tcp://127.0.0.1:11007" +SAMPLER+=" --transport zeromq" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +SAMPLER_LOG="sampler1_$LOGFILETAG" +xterm -l -lf $SAMPLER_LOG -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER & + +echo $SAMPLER + +_iMoni=0 +while (( _iMoni < _nbmoni )); do + (( _yOffset=200*_iMoni )) + (( _iMoni += 1 )) + (( _iPort = 11680 + _iMoni )) + + ### UNPACKER="MqUnpack" # old version + UNPACKER="MqDevUnpack" #new algo version + UNPACKER+=" --id unp$_iMoni" + UNPACKER+=" --severity info" + #UNPACKER+=" --severity debug" + UNPACKER+=" --Setup $_setup_name" + UNPACKER+=" --RunId $_run_id" + UNPACKER+=" --IgnOverMs 1" + UNPACKER+=" --SetTimeOffs kSTS,-2221" + UNPACKER+=" --SetTimeOffs kMUCH,-885" + UNPACKER+=" --SetTimeOffs kTRD,0" + UNPACKER+=" --SetTimeOffs kTRD2D,-1800" + UNPACKER+=" --SetTimeOffs kTOF,-1220" + UNPACKER+=" --SetTimeOffs kRICH,254800" + UNPACKER+=" --SetTimeOffs kPSD,0" + UNPACKER+=" --PubFreqTs $_pubfreqts" + UNPACKER+=" --PubTimeMin $_pubminsec" + UNPACKER+=" --PubTimeMax $_pubmaxsec" + UNPACKER+=" --TsNameOut unpts$_iMoni" + UNPACKER+=" --channel-config name=ts-request,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11555" + UNPACKER+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" + UNPACKER+=" --channel-config name=unpts$_iMoni,type=push,method=bind,transport=zeromq,address=tcp://127.0.0.1:$_iPort" +# UNPACKER+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" + UNPACKER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666" + UNPACKER+=" --transport zeromq" + # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX + # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log + UNPACKER_LOG="unp$_iMoni" + UNPACKER_LOG+="_$LOGFILETAG" + xterm -l -lf $UNPACKER_LOG -geometry 132x23+400+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/mcbm/$UNPACKER & + + +done + +PARAMETERSERVER="parmq-server" +PARAMETERSERVER+=" --id parmq-server" +PARAMETERSERVER+=" --severity info" +PARAMETERSERVER+=" --channel-name parameters" +PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0" +PARAMETERSERVER+=" --first-input-name $_parfileSts;$_parfileMuch;$_parfileTrdAsic;$_parfileTrdDigi;$_parfileTrdGas;$_parfileTrdGain;$_parfileTof;$_parfileRich;$_parfilePsd" +PARAMETERSERVER+=" --first-input-type ASCII" +PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem +PARAMETERSERVER+=" --setup $_setup_name" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +PARAMSRV_LOG="parmq_$LOGFILETAG" +xterm -l -lf $PARAMSRV_LOG -geometry 80x23+1600+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER & + +HISTSERVER="MqHistoServer" +HISTSERVER+=" --id server1" +HISTSERVER+=" --severity info" +HISTSERVER+=" --histport 8081" +HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666" +HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667,rateLogging=0" +HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668,rateLogging=0" +# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX +# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log +HISTSRV_LOG="server1_$LOGFILETAG" +xterm -l -lf $HISTSRV_LOG -geometry 80x23+2000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &