with 509 additions and 1363 deletions
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmMqTMessage.h"
#include "Timeslice.hpp"
#include "FairMQDevice.h"
#include "Rtypes.h"
#include "TObjArray.h"
#include <chrono>
#include <map>
#include <vector>
class TList;
class CbmMcbm2018MonitorAlgoBmon;
class CbmDeviceMonitorReqBmon : public FairMQDevice {
virtual ~CbmDeviceMonitorReqBmon();
virtual void InitTask();
virtual bool ConditionalRun();
/// Constants
static const uint16_t kusSysId = 0x90;
/// Control flags
Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
Bool_t fbComponentsAddedToList = kFALSE;
/// User settings parameters
std::string fsChannelNameDataInput = "ts-request";
std::string fsTsBlockName = "t0block";
std::string fsChannelNameHistosInput = "histogram-in";
uint32_t fuHistoryHistoSize = 3600;
uint32_t fuMinTotPulser = 185;
uint32_t fuMaxTotPulser = 195;
uint32_t fuOffSpillCountLimit = 25;
uint32_t fuOffSpillCountLimitNonPulser = 10;
double fdSpillCheckInterval = 0.0128;
std::vector<uint32_t> fvuChanMap = {0, 1, 2, 3, 4, 5, 6, 7};
uint32_t fuPublishFreqTs = 100;
double_t fdMinPublishTime = 0.5;
double_t fdMaxPublishTime = 5.0;
/// Parameters management
TList* fParCList = nullptr;
/// Statistics & first TS rejection
uint64_t fulNumMessages = 0;
uint64_t fulTsCounter = 0;
std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now();
/// Processing algo
CbmMcbm2018MonitorAlgoBmon* fMonitorAlgo;
/// Array of histograms to send to the histogram server
TObjArray fArrayHisto = {};
/// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server
std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {};
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {};
/// Flag indicating whether the histograms and canvases configurations were already published
bool fbConfigSent = false;
bool InitContainers();
bool InitHistograms();
bool DoUnpack(const fles::Timeslice& ts, size_t component);
void Finish();
bool SendHistoConfAndData();
bool SendHistograms();
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmDeviceMonitorReqT0.h"
#include "CbmFlesCanvasTools.h"
#include "CbmMcbm2018MonitorAlgoT0.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include "TList.h"
#include "TNamed.h"
#include "BoostSerializer.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp>
#include <array>
#include <iomanip>
#include <stdexcept>
#include <string>
#include "RootSerializer.h"
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
using namespace std;
CbmDeviceMonitorReqT0::CbmDeviceMonitorReqT0() : fMonitorAlgo {new CbmMcbm2018MonitorAlgoT0()} {}
void CbmDeviceMonitorReqT0::InitTask()
try {
/// Read options from executable
LOG(info) << "Init options for CbmMqStarHistoServer.";
fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin");
fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax");
fuOffSpillCountLimit = fConfig->GetValue<uint32_t>("SpillThr");
fuOffSpillCountLimitNonPulser = fConfig->GetValue<uint32_t>("SpillThrNonPuls");
fdSpillCheckInterval = fConfig->GetValue<double>("SpillCheckInt");
std::string sChanMap = fConfig->GetValue<std::string>("ChanMap");
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
fsTsBlockName = fConfig->GetValue<std::string>("TsBlockName");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
UInt_t uChanIdx = 0;
size_t charPosDel = sChanMap.find(',');
while (uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel) {
fvuChanMap[uChanIdx] = std::stoul(sChanMap.substr(0, charPosDel));
sChanMap = sChanMap.substr(charPosDel + 1);
charPosDel = sChanMap.find(',');
} // while( uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel )
if (uChanIdx < fvuChanMap.size()) {
fvuChanMap[uChanIdx] = std::stoul(sChanMap);
} // if( uChanIdx < fvuChanMap.size() )
LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
if ("" == fsTsBlockName) {
LOG(info) << "Requesting TS using the SysId: 0x" << std::hex << static_cast<int>(kusSysId) << std::dec;
else {
LOG(info) << "Requesting TS using the following block name: " << fsTsBlockName;
catch (InitTaskError& e) {
LOG(error) << e.what();
bool CbmDeviceMonitorReqT0::InitContainers()
LOG(info) << "Init parameter containers for CbmDeviceMonitorReqT0.";
fParCList = fMonitorAlgo->GetParList();
for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
std::string paramName {tempObj->GetName()};
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage(message));
FairMQMessagePtr rep(NewMessage());
FairParGenericSet* newObj = nullptr;
if (Send(req, "parameters") > 0) {
if (Receive(rep, "parameters") >= 0) {
if (rep->GetSize() != 0) {
CbmMQTMessage tmsg(rep->GetData(), rep->GetSize());
newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from the server:";
else {
LOG(error) << "Received empty reply. Parameter not available";
} // if (rep->GetSize() != 0)
} // if (Receive(rep, "parameters") >= 0)
} // if (Send(req, "parameters") > 0)
fParCList->AddAt(newObj, iparC);
delete tempObj;
} // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
/// Need to add accessors for all options
fMonitorAlgo->SetPulserTotLimits(fuMinTotPulser, fuMaxTotPulser);
fMonitorAlgo->SetChannelMap(fvuChanMap[0], fvuChanMap[1], fvuChanMap[2], fvuChanMap[3], fvuChanMap[4], fvuChanMap[5],
fvuChanMap[6], fvuChanMap[7]);
// fMonitorAlgo->AddMsComponentToList(0, 0x90);
Bool_t initOK = fMonitorAlgo->InitContainers();
return initOK;
bool CbmDeviceMonitorReqT0::InitHistograms()
/// Histos creation and obtain pointer on them
/// Trigger histo creation on all associated algos
bool initOK = fMonitorAlgo->CreateHistograms();
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ]
// ;
std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
LOG(info) << "Config of hist " << << " in folder " <<;
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ];
std::string sCanvName = (vCanvases[uCanv].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
LOG(info) << "Config string of Canvas " << << " is " <<;
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
return initOK;
bool CbmDeviceMonitorReqT0::ConditionalRun()
/// First request a new TS (full or single system components or multi-syst components block)
std::string message = fsTsBlockName;
if ("" == message) message = std::to_string(kusSysId);
LOG(debug) << "Requesting new TS by sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage(message));
FairMQMessagePtr rep(NewMessage());
if (Send(req, fsChannelNameDataInput) <= 0) {
LOG(error) << "Failed to send the request! message was " << message;
return false;
} // if (Send(req, fsChannelNameDataInput) <= 0)
else if (Receive(rep, fsChannelNameDataInput) < 0) {
LOG(error) << "Failed to receive a reply to the request! message was " << message;
return false;
} // else if (Receive(rep, fsChannelNameDataInput) < 0)
else if (rep->GetSize() == 0) {
LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message;
return false;
} // else if (rep->GetSize() == 0)
/// Message received, do Algo related Initialization steps if needed
if (0 == fulNumMessages) {
try {
catch (InitTaskError& e) {
LOG(error) << e.what();
} // if( 0 == fulNumMessages)
if (0 == fulNumMessages) InitHistograms();
LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize();
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
/// Create an empty TS and fill it with the incoming message
fles::StorableTimeslice component {0};
inputArchive >> component;
/// Process the Timeslice
DoUnpack(component, 0);
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if ((fdMaxPublishTime < elapsedSeconds.count())
|| (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
if (!fbConfigSent) {
// Send the configuration only once per run!
fbConfigSent = SendHistoConfAndData();
} // if( !fbConfigSent )
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
return true;
bool CbmDeviceMonitorReqT0::SendHistoConfAndData()
/// Prepare multiparts message and header
std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
FairMQMessagePtr messageHeader(NewMessage());
Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
FairMQParts partsOut;
for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist(NewMessage());
Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
} // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan(NewMessage());
Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
} // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr msgHistos(NewMessage());
Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
/// Send the multi-parts message to the common histogram messages queue
if (Send(partsOut, fsChannelNameHistosInput) < 0) {
LOG(error) << "CbmDeviceMonitorReqT0::SendHistoConfAndData => Problem sending data";
return false;
} // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
return true;
bool CbmDeviceMonitorReqT0::SendHistograms()
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
Serialize<RootSerializer>(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
return true;
CbmDeviceMonitorReqT0::~CbmDeviceMonitorReqT0() {}
Bool_t CbmDeviceMonitorReqT0::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
if (kFALSE == fbComponentsAddedToList) {
for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysId);
} // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
return kTRUE;
} // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
/// Clear the digis vector in case it was filled
if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
return kTRUE;
void CbmDeviceMonitorReqT0::Finish() {}
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "Timeslice.hpp"
#include "FairMQDevice.h"
#include "Rtypes.h"
#include "TMessage.h"
#include "TObjArray.h"
#include <chrono>
#include <map>
#include <vector>
class TList;
class CbmMcbm2018MonitorAlgoT0;
class CbmDeviceMonitorReqT0 : public FairMQDevice {
virtual ~CbmDeviceMonitorReqT0();
virtual void InitTask();
virtual bool ConditionalRun();
/// Constants
static const uint16_t kusSysId = 0x90;
/// Control flags
Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
Bool_t fbComponentsAddedToList = kFALSE;
/// User settings parameters
std::string fsChannelNameDataInput = "ts-request";
std::string fsTsBlockName = "t0block";
std::string fsChannelNameHistosInput = "histogram-in";
uint32_t fuHistoryHistoSize = 3600;
uint32_t fuMinTotPulser = 185;
uint32_t fuMaxTotPulser = 195;
uint32_t fuOffSpillCountLimit = 25;
uint32_t fuOffSpillCountLimitNonPulser = 10;
double fdSpillCheckInterval = 0.0128;
std::vector<uint32_t> fvuChanMap = {0, 1, 2, 3, 4, 5, 6, 7};
uint32_t fuPublishFreqTs = 100;
double_t fdMinPublishTime = 0.5;
double_t fdMaxPublishTime = 5.0;
/// Parameters management
TList* fParCList = nullptr;
/// Statistics & first TS rejection
uint64_t fulNumMessages = 0;
uint64_t fulTsCounter = 0;
std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now();
/// Processing algo
CbmMcbm2018MonitorAlgoT0* fMonitorAlgo;
/// Array of histograms to send to the histogram server
TObjArray fArrayHisto = {};
/// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server
std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {};
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {};
/// Flag indicating whether the histograms and canvases configurations were already published
bool fbConfigSent = false;
bool InitContainers();
bool InitHistograms();
bool DoUnpack(const fles::Timeslice& ts, size_t component);
void Finish();
bool SendHistoConfAndData();
bool SendHistograms();
// special class to expose protected TMessage constructor
class CbmMQTMessage : public TMessage {
CbmMQTMessage(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
......@@ -96,7 +96,7 @@ Bool_t CbmDeviceMonitorReqTof::InitContainers()
if (Send(req, "parameters") > 0) {
if (Receive(rep, "parameters") >= 0) {
if (rep->GetSize() != 0) {
CbmMQTMessage tmsg(rep->GetData(), rep->GetSize());
CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from the server:";
......@@ -246,7 +246,8 @@ bool CbmDeviceMonitorReqTof::SendHistoConfAndData()
/// Prepare multiparts message and header
std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
FairMQMessagePtr messageHeader(NewMessage());
Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
// Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
FairMQParts partsOut;
......@@ -254,7 +255,8 @@ bool CbmDeviceMonitorReqTof::SendHistoConfAndData()
for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist(NewMessage());
Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
} // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
......@@ -262,14 +264,16 @@ bool CbmDeviceMonitorReqTof::SendHistoConfAndData()
for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan(NewMessage());
Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
} // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr msgHistos(NewMessage());
Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
// Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
RootSerializer().Serialize(*msgHistos, &fArrayHisto);
......@@ -289,7 +293,8 @@ bool CbmDeviceMonitorReqTof::SendHistograms()
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
Serialize<RootSerializer>(*message, &fArrayHisto);
// Serialize<RootSerializer>(*message, &fArrayHisto);
RootSerializer().Serialize(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
......@@ -316,9 +321,9 @@ Bool_t CbmDeviceMonitorReqTof::DoUnpack(const fles::Timeslice& ts, size_t /*comp
if (kusSysIdTof == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdTof);
} // if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id )
else if (kusSysIdT0 == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdT0);
} // if( kusSysIdT0 == ts.descriptor( uCompIdx, 0 ).sys_id )
else if (kusSysIdBmon == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdBmon);
} // if( kusSysIdBmon == ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
......@@ -5,12 +5,13 @@
#include "CbmMqTMessage.h"
#include "Timeslice.hpp"
#include "FairMQDevice.h"
#include "Rtypes.h"
#include "TMessage.h"
#include "TObjArray.h"
#include <chrono>
......@@ -32,7 +33,7 @@ protected:
/// Constants
static const uint16_t kusSysIdTof = 0x60;
static const uint16_t kusSysIdT0 = 0x90;
static const uint16_t kusSysIdBmon = 0x90;
/// Control flags
Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
......@@ -82,11 +83,4 @@ private:
bool SendHistograms();
// special class to expose protected TMessage constructor
class CbmMQTMessage : public TMessage {
CbmMQTMessage(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
* CbmDeviceMonitorT0.cxx
* @since 2019-03-26
* @author F. Uhlig
#include "CbmDeviceMonitorT0.h"
#include "CbmFlesCanvasTools.h"
#include "CbmMcbm2018MonitorAlgoT0.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include "TList.h"
#include "TNamed.h"
#include "BoostSerializer.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp>
#include <array>
#include <iomanip>
#include <stdexcept>
#include <string>
#include "RootSerializer.h"
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
using namespace std;
: fbIgnoreOverlapMs {false}
, fsChannelNameDataInput {"t0component"}
, fsChannelNameHistosInput {"histogram-in"}
, fuHistoryHistoSize {3600}
, fuMinTotPulser {185}
, fuMaxTotPulser {195}
, fuOffSpillCountLimit {25}
, fuOffSpillCountLimitNonPulser {10}
, fdSpillCheckInterval {0.0128}
, fvuChanMap {0, 1, 2, 3, 4, 5, 6, 7}
, fuPublishFreqTs {100}
, fdMinPublishTime {0.5}
, fdMaxPublishTime {5.0}
, fsAllowedChannels {fsChannelNameDataInput}
, fParCList {nullptr}
, fulNumMessages {0}
, fulTsCounter {0}
, fLastPublishTime {std::chrono::system_clock::now()}
, fMonitorAlgo {new CbmMcbm2018MonitorAlgoT0()}
, fArrayHisto {}
, fvpsHistosFolder {}
, fvpsCanvasConfig {}
void CbmDeviceMonitorT0::InitTask()
try {
/// Read options from executable
LOG(info) << "Init options for CbmMqStarHistoServer.";
fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin");
fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax");
fuOffSpillCountLimit = fConfig->GetValue<uint32_t>("SpillThr");
fuOffSpillCountLimitNonPulser = fConfig->GetValue<uint32_t>("SpillThrNonPuls");
fdSpillCheckInterval = fConfig->GetValue<double>("SpillCheckInt");
std::string sChanMap = fConfig->GetValue<std::string>("ChanMap");
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsAllowedChannels[0] = fsChannelNameDataInput;
UInt_t uChanIdx = 0;
size_t charPosDel = sChanMap.find(',');
while (uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel) {
LOG(info) << sChanMap.substr(0, charPosDel);
fvuChanMap[uChanIdx] = std::stoul(sChanMap.substr(0, charPosDel));
LOG(info) << fvuChanMap[uChanIdx];
sChanMap = sChanMap.substr(charPosDel + 1);
LOG(info) << sChanMap;
charPosDel = sChanMap.find(',');
} // while( uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel )
if (uChanIdx < fvuChanMap.size()) {
LOG(info) << sChanMap;
fvuChanMap[uChanIdx] = std::stoul(sChanMap);
LOG(info) << fvuChanMap[uChanIdx];
} // if( uChanIdx < fvuChanMap.size() )
LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmDeviceMonitorT0::HandleData);
} // if( entry.first.find( "ts" )
} // for( auto const &entry : fChannels )
catch (InitTaskError& e) {
LOG(error) << e.what();
bool CbmDeviceMonitorT0::IsChannelNameAllowed(std::string channelName)
for (auto const& entry : fsAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
bool CbmDeviceMonitorT0::InitContainers()
LOG(info) << "Init parameter containers for CbmDeviceMonitorT0.";
fParCList = fMonitorAlgo->GetParList();
for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
std::string paramName {tempObj->GetName()};
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage(message));
FairMQMessagePtr rep(NewMessage());
FairParGenericSet* newObj = nullptr;
if (Send(req, "parameters") > 0) {
if (Receive(rep, "parameters") >= 0) {
if (rep->GetSize() != 0) {
CbmMQTMessage tmsg(rep->GetData(), rep->GetSize());
newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from the server:";
else {
LOG(error) << "Received empty reply. Parameter not available";
} // if (rep->GetSize() != 0)
} // if (Receive(rep, "parameters") >= 0)
} // if (Send(req, "parameters") > 0)
fParCList->AddAt(newObj, iparC);
delete tempObj;
} // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
/// Need to add accessors for all options
fMonitorAlgo->SetPulserTotLimits(fuMinTotPulser, fuMaxTotPulser);
fMonitorAlgo->SetChannelMap(fvuChanMap[0], fvuChanMap[1], fvuChanMap[2], fvuChanMap[3], fvuChanMap[4], fvuChanMap[5],
fvuChanMap[6], fvuChanMap[7]);
// fMonitorAlgo->AddMsComponentToList(0, 0x90);
Bool_t initOK = fMonitorAlgo->InitContainers();
return initOK;
bool CbmDeviceMonitorT0::InitHistograms()
/// Histos creation and obtain pointer on them
/// Trigger histo creation on all associated algos
bool initOK = fMonitorAlgo->CreateHistograms();
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ]
// ;
std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
LOG(info) << "Config of hist " << << " in folder " <<;
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ];
std::string sCanvName = (vCanvases[uCanv].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
LOG(info) << "Config string of Canvas " << << " is " <<;
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
return initOK;
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMonitorT0::HandleData(FairMQMessagePtr& msg, int /*index*/)
if (0 == fulNumMessages) {
try {
catch (InitTaskError& e) {
LOG(error) << e.what();
} // if( 0 == fulNumMessages)
if (0 == fulNumMessages) InitHistograms();
LOG(debug) << "Received message number " << fulNumMessages << " with size " << msg->GetSize();
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
/// Create an empty TS and fill it with the incoming message
fles::StorableTimeslice component {0};
inputArchive >> component;
/// Process the Timeslice
DoUnpack(component, 0);
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if ((fdMaxPublishTime < elapsedSeconds.count())
|| (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
if (!fbConfigSent) {
// Send the configuration only once per run!
fbConfigSent = SendHistoConfAndData();
} // if( !fbConfigSent )
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
return true;
bool CbmDeviceMonitorT0::SendHistoConfAndData()
/// Prepare multiparts message and header
std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
FairMQMessagePtr messageHeader(NewMessage());
Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
FairMQParts partsOut;
for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist(NewMessage());
Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
} // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan(NewMessage());
Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
} // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr msgHistos(NewMessage());
Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
/// Send the multi-parts message to the common histogram messages queue
if (Send(partsOut, fsChannelNameHistosInput) < 0) {
LOG(error) << "CbmDeviceMonitorT0::SendHistoConfAndData => Problem sending data";
return false;
} // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
return true;
bool CbmDeviceMonitorT0::SendHistograms()
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
Serialize<RootSerializer>(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
return true;
CbmDeviceMonitorT0::~CbmDeviceMonitorT0() {}
Bool_t CbmDeviceMonitorT0::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
if (kFALSE == fbComponentsAddedToList) {
for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysId);
} // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
return kTRUE;
} // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
/// Clear the digis vector in case it was filled
if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
return kTRUE;
void CbmDeviceMonitorT0::Finish() {}
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
* CbmDeviceMonitorT0.h
* @since 2019-03-26
* @author F. Uhlig
#include "Timeslice.hpp"
#include "FairMQDevice.h"
#include "Rtypes.h"
#include "TMessage.h"
#include "TObjArray.h"
#include <chrono>
#include <map>
#include <vector>
class TList;
class CbmMcbm2018MonitorAlgoT0;
class CbmDeviceMonitorT0 : public FairMQDevice {
virtual ~CbmDeviceMonitorT0();
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
/// Constants
static const uint16_t kusSysId = 0x90;
/// Control flags
Bool_t fbIgnoreOverlapMs; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
Bool_t fbComponentsAddedToList = kFALSE;
/// User settings parameters
std::string fsChannelNameDataInput;
std::string fsChannelNameHistosInput;
uint32_t fuHistoryHistoSize;
uint32_t fuMinTotPulser;
uint32_t fuMaxTotPulser;
uint32_t fuOffSpillCountLimit;
uint32_t fuOffSpillCountLimitNonPulser;
double fdSpillCheckInterval;
std::vector<uint32_t> fvuChanMap;
uint32_t fuPublishFreqTs;
double_t fdMinPublishTime;
double_t fdMaxPublishTime;
/// List of MQ channels names
std::vector<std::string> fsAllowedChannels;
/// Parameters management
TList* fParCList;
/// Statistics & first TS rejection
uint64_t fulNumMessages;
uint64_t fulTsCounter;
std::chrono::system_clock::time_point fLastPublishTime;
/// Processing algo
CbmMcbm2018MonitorAlgoT0* fMonitorAlgo;
/// Array of histograms to send to the histogram server
TObjArray fArrayHisto;
/// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server
std::vector<std::pair<std::string, std::string>> fvpsHistosFolder;
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig;
/// Flag indicating whether the histograms and canvases configurations were already published
bool fbConfigSent = false;
bool IsChannelNameAllowed(std::string channelName);
bool InitContainers();
bool InitHistograms();
bool DoUnpack(const fles::Timeslice& ts, size_t component);
void Finish();
bool SendHistoConfAndData();
bool SendHistograms();
// special class to expose protected TMessage constructor
class CbmMQTMessage : public TMessage {
CbmMQTMessage(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
......@@ -141,7 +141,7 @@ Bool_t CbmDeviceMonitorTof::InitContainers()
if (Send(req, "parameters") > 0) {
if (Receive(rep, "parameters") >= 0) {
if (rep->GetSize() != 0) {
CbmMQTMessage tmsg(rep->GetData(), rep->GetSize());
CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from the server:";
......@@ -270,7 +270,8 @@ bool CbmDeviceMonitorTof::SendHistoConfAndData()
/// Prepare multiparts message and header
std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
FairMQMessagePtr messageHeader(NewMessage());
Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
// Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
FairMQParts partsOut;
......@@ -278,7 +279,8 @@ bool CbmDeviceMonitorTof::SendHistoConfAndData()
for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist(NewMessage());
Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
} // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
......@@ -286,14 +288,16 @@ bool CbmDeviceMonitorTof::SendHistoConfAndData()
for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan(NewMessage());
Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
} // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr msgHistos(NewMessage());
Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
// Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
RootSerializer().Serialize(*msgHistos, &fArrayHisto);
......@@ -313,7 +317,8 @@ bool CbmDeviceMonitorTof::SendHistograms()
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
Serialize<RootSerializer>(*message, &fArrayHisto);
// Serialize<RootSerializer>(*message, &fArrayHisto);
RootSerializer().Serialize(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
......@@ -340,9 +345,9 @@ Bool_t CbmDeviceMonitorTof::DoUnpack(const fles::Timeslice& ts, size_t /*compone
if (kusSysIdTof == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdTof);
} // if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id )
else if (kusSysIdT0 == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdT0);
} // if( kusSysIdT0 == ts.descriptor( uCompIdx, 0 ).sys_id )
else if (kusSysIdBmon == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdBmon);
} // if( kusSysIdBmon == ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
......@@ -12,12 +12,13 @@
#include "CbmMqTMessage.h"
#include "Timeslice.hpp"
#include "FairMQDevice.h"
#include "Rtypes.h"
#include "TMessage.h"
#include "TObjArray.h"
#include <chrono>
......@@ -39,7 +40,7 @@ protected:
/// Constants
static const uint16_t kusSysIdTof = 0x60;
static const uint16_t kusSysIdT0 = 0x90;
static const uint16_t kusSysIdBmon = 0x90;
/// Control flags
Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
......@@ -92,11 +93,4 @@ private:
bool SendHistograms();
// special class to expose protected TMessage constructor
class CbmMQTMessage : public TMessage {
CbmMQTMessage(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
#include "CbmDeviceMonitorBmon.h"
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800),
"Size of evolution histos in seconds");
options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut");
options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut");
options.add_options()("SpillThr", bpo::value<uint32_t>()->default_value(25), "Hits Nb Thr for spill detection");
options.add_options()("SpillThrNonPuls", bpo::value<uint32_t>()->default_value(10),
"Non pulser Hits Nb Thr for spill detection");
options.add_options()("SpillCheckInt", bpo::value<double>()->default_value(0.128),
"Interval in seconds between count checks for spill detection");
options.add_options()("ChanMap", bpo::value<std::string>()->default_value("0,1,2,3,4,5,6,7"),
"Set Bmon channel map e.g. 0,1,2,3,4,5,6,7");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("t0component"),
"MQ channel name for TS data");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorBmon(); }
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmDeviceMonitorReqBmon.h"
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800),
"Size of evolution histos in seconds");
options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut");
options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut");
options.add_options()("SpillThr", bpo::value<uint32_t>()->default_value(25), "Hits Nb Thr for spill detection");
options.add_options()("SpillThrNonPuls", bpo::value<uint32_t>()->default_value(10),
"Non pulser Hits Nb Thr for spill detection");
options.add_options()("SpillCheckInt", bpo::value<double>()->default_value(0.128),
"Interval in seconds between count checks for spill detection");
options.add_options()("ChanMap", bpo::value<std::string>()->default_value("0,1,2,3,4,5,6,7"),
"Set Bmon channel map e.g. 0,1,2,3,4,5,6,7");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"),
"MQ channel name for TS data");
options.add_options()("TsBlockName", bpo::value<std::string>()->default_value(""),
"Block name for requesting TS data, Bmon SysId request if empty");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorReqBmon(); }
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmDeviceMonitorReqT0.h"
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800),
"Size of evolution histos in seconds");
options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut");
options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut");
options.add_options()("SpillThr", bpo::value<uint32_t>()->default_value(25), "Hits Nb Thr for spill detection");
options.add_options()("SpillThrNonPuls", bpo::value<uint32_t>()->default_value(10),
"Non pulser Hits Nb Thr for spill detection");
options.add_options()("SpillCheckInt", bpo::value<double>()->default_value(0.128),
"Interval in seconds between count checks for spill detection");
options.add_options()("ChanMap", bpo::value<std::string>()->default_value("0,1,2,3,4,5,6,7"),
"Set T0 channel map e.g. 0,1,2,3,4,5,6,7");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"),
"MQ channel name for TS data");
options.add_options()("TsBlockName", bpo::value<std::string>()->default_value(""),
"Block name for requesting TS data, T0 SysId request if empty");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorReqT0(); }
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
#include "CbmDeviceMonitorT0.h"
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800),
"Size of evolution histos in seconds");
options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut");
options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut");
options.add_options()("SpillThr", bpo::value<uint32_t>()->default_value(25), "Hits Nb Thr for spill detection");
options.add_options()("SpillThrNonPuls", bpo::value<uint32_t>()->default_value(10),
"Non pulser Hits Nb Thr for spill detection");
options.add_options()("SpillCheckInt", bpo::value<double>()->default_value(0.128),
"Interval in seconds between count checks for spill detection");
options.add_options()("ChanMap", bpo::value<std::string>()->default_value("0,1,2,3,4,5,6,7"),
"Set T0 channel map e.g. 0,1,2,3,4,5,6,7");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("t0component"),
"MQ channel name for TS data");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorT0(); }
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
if [ $# -ge 1 ]; then
((_pubfreqts = $_nbmoni*100 ))
if [ $# -ge 4 ]; then
if [ $# -ge 5 ]; then
if [ $# -ge 6 ]; then
if [ $# -ge 7 ]; then
elif [ $# -ge 2 ]; then
if [ $# -eq 3 ]; then
echo 'Starting connection to local stream'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo ''
echo ' <Nb Monitor processes>'
echo ' <Nb Monitor processes> <full filename pattern list>'
echo ' <Nb Monitor processes> <filename pattern> <folder_path>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
echo 'Starting connection to local stream with 1 monitor process'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo ''
echo ' <Nb Monitor processes>'
echo ' <Nb Monitor processes> <full filename pattern list>'
echo ' <Nb Monitor processes> <filename pattern> <folder_path>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
SAMPLER+=" --id sampler1"
SAMPLER+=" --max-timeslices 0"
SAMPLER+=" --severity info"
#SAMPLER+=" --flib-port 10"
if [ "$_hostname" != "" ]; then
SAMPLER+=" --flib-host $_hostname"
elif [ "$_filename" != "" ]; then
SAMPLER+=" --filename $_filename"
if [ "$_dirname" != "" ]; then
SAMPLER+=" --dirname $_dirname"
SAMPLER+=" --high-water-mark 1000"
SAMPLER+=" --send-ts-per-sysid 1"
SAMPLER+=" --channel-config name=t0component,type=push,method=bind,address=tcp://"
#SAMPLER+=" --transport shmem"
SAMPLER+=" --transport zeromq"
#SAMPLER+=" --transport nanomsg"
xterm -l -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER &
while (( _iMoni < _nbmoni )); do
(( _yOffset=100*_iMoni ))
(( _iMoni += 1 ))
MONITOR+=" --id mon$_iMoni"
MONITOR+=" --severity info"
MONITOR+=" --PubFreqTs $_pubfreqts"
MONITOR+=" --PubTimeMin $_pubminsec"
MONITOR+=" --PubTimeMax $_pubmaxsec"
MONITOR+=" --channel-config name=t0component,type=pull,method=connect,address=tcp://"
#MONITOR+=" --transport shmem"
MONITOR+=" --transport zeromq"
#MONITOR+=" --transport nanomsg"
MONITOR+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://,rateLogging=0"
MONITOR+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://"
MONITOR+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://"
MONITOR+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://"
xterm -l -geometry 80x23+500+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/monitor/$MONITOR &
PARAMETERSERVER+=" --id parmq-server"
PARAMETERSERVER+=" --severity info"
PARAMETERSERVER+=" --channel-name parameters"
PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://,rateLogging=0"
PARAMETERSERVER+=" --first-input-name $_paramfile"
PARAMETERSERVER+=" --first-input-type ASCII"
PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem
xterm -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &
HISTSERVER+=" --id server1"
HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://"
HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://"
HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://"
xterm -geometry 80x23+1500+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
if [ $# -ge 1 ]; then
((_pubfreqts = $_nbmoni*100 ))
if [ $# -ge 4 ]; then
if [ $# -ge 5 ]; then
if [ $# -ge 6 ]; then
if [ $# -ge 7 ]; then
elif [ $# -ge 2 ]; then
if [ $# -eq 3 ]; then
echo 'Starting connection to local stream'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo ''
echo ' <Nb Monitor processes>'
echo ' <Nb Monitor processes> <full filename pattern list>'
echo ' <Nb Monitor processes> <filename pattern> <folder_path>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
echo 'Starting connection to local stream with 1 monitor process'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo ''
echo ' <Nb Monitor processes>'
echo ' <Nb Monitor processes> <full filename pattern list>'
echo ' <Nb Monitor processes> <filename pattern> <folder_path>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
LOGFILETAG+=`date +%Y_%m_%d_%H_%M_%S`
SAMPLER+=" --id sampler1"
SAMPLER+=" --max-timeslices 0"
SAMPLER+=" --severity info"
#SAMPLER+=" --flib-port 10"
if [ "$_hostname" != "" ]; then
SAMPLER+=" --flib-host $_hostname"
elif [ "$_filename" != "" ]; then
SAMPLER+=" --filename $_filename"
if [ "$_dirname" != "" ]; then
SAMPLER+=" --dirname $_dirname"
SAMPLER+=" --high-water-mark 1000"
SAMPLER+=" --send-ts-per-sysid 1"
SAMPLER+=" --channel-config name=t0component,type=push,method=bind,address=tcp://"
#SAMPLER+=" --transport shmem"
SAMPLER+=" --transport zeromq"
#SAMPLER+=" --transport nanomsg"
# Replaces log filename
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
xterm -l -lf $SAMPLER_LOG -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER &
while (( _iMoni < _nbmoni )); do
(( _yOffset=100*_iMoni ))
(( _iMoni += 1 ))
MONITOR+=" --id mon$_iMoni"
MONITOR+=" --severity info"
MONITOR+=" --PubFreqTs $_pubfreqts"
MONITOR+=" --PubTimeMin $_pubminsec"
MONITOR+=" --PubTimeMax $_pubmaxsec"
MONITOR+=" --PulsTotMin 185"
MONITOR+=" --PulsTotMax 190"
MONITOR+=" --SpillThr 200"
MONITOR+=" --ChanMap 4,5,6,7,0,1,2,3"
MONITOR+=" --channel-config name=t0component,type=pull,method=connect,address=tcp://"
#MONITOR+=" --transport shmem"
MONITOR+=" --transport zeromq"
#MONITOR+=" --transport nanomsg"
MONITOR+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://,rateLogging=0"
MONITOR+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://"
MONITOR+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://"
MONITOR+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://"
# Replaces log filename
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
xterm -l -lf $MONITOR_LOG -geometry 80x23+500+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/monitor/$MONITOR &
PARAMETERSERVER+=" --id parmq-server"
PARAMETERSERVER+=" --severity info"
PARAMETERSERVER+=" --channel-name parameters"
PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://,rateLogging=0"
PARAMETERSERVER+=" --first-input-name $_paramfile"
PARAMETERSERVER+=" --first-input-type ASCII"
PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem
# Replaces log filename
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
xterm -l -lf $PARAMSRV_LOG -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &
HISTSERVER+=" --id server1"
HISTSERVER+=" --severity info"
HISTSERVER+=" --histport 8082"
HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://"
HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://"
HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://"
# Replaces log filename
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
xterm -l -lf $HISTSRV_LOG -geometry 80x23+1500+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &
$SIMPATH/bin/fairmq-shmmonitor --cleanup
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
if [ $# -ge 1 ]; then
......@@ -44,7 +47,7 @@ else
SAMPLER+=" --id sampler1"
$SIMPATH/bin/fairmq-shmmonitor --cleanup
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
if [ $# -ge 1 ]; then
......@@ -44,8 +47,7 @@ else
SAMPLER+=" --id sampler1"
$SIMPATH/bin/fairmq-shmmonitor --cleanup
if [ $# -ge 1 ]; then
((_pubfreqts = $_nbmoni*100 ))
if [ $# -ge 4 ]; then
if [ $# -ge 5 ]; then
if [ $# -ge 6 ]; then
if [ $# -ge 7 ]; then
elif [ $# -ge 2 ]; then
if [ $# -eq 3 ]; then
echo 'Starting connection to local stream'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo ''
echo ' <Nb Monitor processes>'
echo ' <Nb Monitor processes> <full filename pattern list>'
echo ' <Nb Monitor processes> <filename pattern> <folder_path>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
echo 'Starting connection to local stream with 1 monitor process'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo ''
echo ' <Nb Monitor processes>'
echo ' <Nb Monitor processes> <full filename pattern list>'
echo ' <Nb Monitor processes> <filename pattern> <folder_path>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
SAMPLER+=" --id sampler1"
SAMPLER+=" --max-timeslices 0"
SAMPLER+=" --severity info"
#SAMPLER+=" --flib-port 10"
if [ "$_hostname" != "" ]; then
SAMPLER+=" --flib-host $_hostname"
elif [ "$_filename" != "" ]; then
SAMPLER+=" --filename $_filename"
if [ "$_dirname" != "" ]; then
SAMPLER+=" --dirname $_dirname"
SAMPLER+=" --high-water-mark 1000"
SAMPLER+=" --send-ts-per-sysid 1"
SAMPLER+=" --channel-config name=t0component,type=push,method=bind,address=tcp://"
#SAMPLER+=" --transport shmem"
SAMPLER+=" --transport zeromq"
#SAMPLER+=" --transport nanomsg"
xterm -l -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER &
while (( _iMoni < _nbmoni )); do
(( _yOffset=100*_iMoni ))
(( _iMoni += 1 ))
MONITOR+=" --id mon$_iMoni"
MONITOR+=" --severity info"
MONITOR+=" --PubFreqTs $_pubfreqts"
MONITOR+=" --PubTimeMin $_pubminsec"
MONITOR+=" --PubTimeMax $_pubmaxsec"
MONITOR+=" --channel-config name=t0component,type=pull,method=connect,address=tcp://"
#MONITOR+=" --transport shmem"
MONITOR+=" --transport zeromq"
#MONITOR+=" --transport nanomsg"
MONITOR+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://,rateLogging=0"
MONITOR+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://"
MONITOR+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://"
MONITOR+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://"
xterm -l -geometry 80x23+500+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/monitor/$MONITOR &
PARAMETERSERVER+=" --id parmq-server"
PARAMETERSERVER+=" --severity info"
PARAMETERSERVER+=" --channel-name parameters"
PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://,rateLogging=0"
PARAMETERSERVER+=" --first-input-name $_paramfile"
PARAMETERSERVER+=" --first-input-type ASCII"
PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem
xterm -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &
HISTSERVER+=" --id server1"
HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://"
HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://"
HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://"
xterm -geometry 80x23+1500+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &
$SIMPATH/bin/fairmq-shmmonitor --cleanup
if [ $# -ge 1 ]; then
((_pubfreqts = $_nbmoni*100 ))
if [ $# -ge 4 ]; then
if [ $# -ge 5 ]; then
if [ $# -ge 6 ]; then
if [ $# -ge 7 ]; then
elif [ $# -ge 2 ]; then
if [ $# -eq 3 ]; then
echo 'Starting connection to local stream'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo ''
echo ' <Nb Monitor processes>'
echo ' <Nb Monitor processes> <full filename pattern list>'
echo ' <Nb Monitor processes> <filename pattern> <folder_path>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
echo 'Starting connection to local stream with 1 monitor process'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo ''
echo ' <Nb Monitor processes>'
echo ' <Nb Monitor processes> <full filename pattern list>'
echo ' <Nb Monitor processes> <filename pattern> <folder_path>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo ' <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
LOGFILETAG+=`date +%Y_%m_%d_%H_%M_%S`
SAMPLER+=" --id sampler1"
SAMPLER+=" --max-timeslices 0"
SAMPLER+=" --severity info"
#SAMPLER+=" --flib-port 10"
if [ "$_hostname" != "" ]; then
SAMPLER+=" --flib-host $_hostname"
elif [ "$_filename" != "" ]; then
SAMPLER+=" --filename $_filename"
if [ "$_dirname" != "" ]; then
SAMPLER+=" --dirname $_dirname"
SAMPLER+=" --high-water-mark 1000"
SAMPLER+=" --send-ts-per-sysid 1"
SAMPLER+=" --channel-config name=t0component,type=push,method=bind,address=tcp://"
#SAMPLER+=" --transport shmem"
SAMPLER+=" --transport zeromq"
#SAMPLER+=" --transport nanomsg"
# Replaces log filename
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
xterm -l -lf $SAMPLER_LOG -geometry 80x23+0+0 -hold -e $VMCWORKDIR/build/bin/MQ/source/$SAMPLER &
while (( _iMoni < _nbmoni )); do
(( _yOffset=100*_iMoni ))
(( _iMoni += 1 ))
MONITOR+=" --id mon$_iMoni"
MONITOR+=" --severity info"
MONITOR+=" --PubFreqTs $_pubfreqts"
MONITOR+=" --PubTimeMin $_pubminsec"
MONITOR+=" --PubTimeMax $_pubmaxsec"
MONITOR+=" --PulsTotMin 185"
MONITOR+=" --PulsTotMax 190"
MONITOR+=" --SpillThr 200"
MONITOR+=" --ChanMap 4,5,6,7,0,1,2,3"
MONITOR+=" --channel-config name=t0component,type=pull,method=connect,address=tcp://"
#MONITOR+=" --transport shmem"
MONITOR+=" --transport zeromq"
#MONITOR+=" --transport nanomsg"
MONITOR+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://,rateLogging=0"
MONITOR+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://"
MONITOR+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://"
MONITOR+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://"
# Replaces log filename
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
xterm -l -lf $MONITOR_LOG -geometry 80x23+500+$_yOffset -hold -e $VMCWORKDIR/build/bin/MQ/monitor/$MONITOR &
PARAMETERSERVER+=" --id parmq-server"
PARAMETERSERVER+=" --severity info"
PARAMETERSERVER+=" --channel-name parameters"
PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://,rateLogging=0"
PARAMETERSERVER+=" --first-input-name $_paramfile"
PARAMETERSERVER+=" --first-input-type ASCII"
PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem
# Replaces log filename
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
xterm -l -lf $PARAMSRV_LOG -geometry 80x23+1000+0 -hold -e $VMCWORKDIR/build/bin/MQ/parmq/$PARAMETERSERVER &
HISTSERVER+=" --id server1"
HISTSERVER+=" --severity info"
HISTSERVER+=" --histport 8082"
HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://"
HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://"
HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://"
# Replaces log filename
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
xterm -l -lf $HISTSRV_LOG -geometry 80x23+1500+0 -hold -e $VMCWORKDIR/build/bin/MQ/histogramServer/$HISTSERVER &
$SIMPATH/bin/fairmq-shmmonitor --cleanup
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
if [ $# -ge 1 ]; then
......@@ -68,7 +71,7 @@ else
SAMPLER+=" --id sampler1"