From d0694b51bfec1e0f009e86f8f4d2eeeb8689d955 Mon Sep 17 00:00:00 2001
From: "P.-A. Loizeau" <p.-a.loizeau@gsi.de>
Date: Wed, 25 May 2022 13:14:03 +0200
Subject: [PATCH] [MQ] Add BMon monitor device

- Add BMon Unpack/Monitor device
- Add possibility to request from the source the start time of the first TS
---
 MQ/mcbm/CMakeLists.txt             |  41 +-
 MQ/mcbm/CbmDeviceBmonMonitor.cxx   | 605 +++++++++++++++++++++++++++++
 MQ/mcbm/CbmDeviceBmonMonitor.h     | 256 ++++++++++++
 MQ/mcbm/runBmonMonitor.cxx         |  41 ++
 MQ/mcbm/startBmonMoni2022.sh.in    | 235 +++++++++++
 MQ/source/CbmMQTsSamplerRepReq.cxx |  52 ++-
 MQ/source/CbmMQTsSamplerRepReq.h   |   2 +
 7 files changed, 1224 insertions(+), 8 deletions(-)
 create mode 100644 MQ/mcbm/CbmDeviceBmonMonitor.cxx
 create mode 100644 MQ/mcbm/CbmDeviceBmonMonitor.h
 create mode 100644 MQ/mcbm/runBmonMonitor.cxx
 create mode 100755 MQ/mcbm/startBmonMoni2022.sh.in

diff --git a/MQ/mcbm/CMakeLists.txt b/MQ/mcbm/CMakeLists.txt
index 98b51503f5..5ae68bb7b2 100644
--- a/MQ/mcbm/CMakeLists.txt
+++ b/MQ/mcbm/CMakeLists.txt
@@ -1,9 +1,10 @@
-configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQMcbmPulserMonitor2020.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQMcbmPulserMonitor2020.sh)
-configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQMcbmEvtBuilderWin2020.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQMcbmEvtBuilderWin2020.sh)
-configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQBuildRawEvents.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQBuildRawEvents.sh)
-configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEvents2021.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEvents2021.sh)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQMcbmPulserMonitor2020.sh.in   ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQMcbmPulserMonitor2020.sh)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQMcbmEvtBuilderWin2020.sh.in   ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQMcbmEvtBuilderWin2020.sh)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQBuildRawEvents.sh.in          ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQBuildRawEvents.sh)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEvents2021.sh.in        ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEvents2021.sh)
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEventsCosmics2021.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEventsCosmics2021.sh)
-configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEvents2022.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEvents2022.sh)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBuildRawEvents2022.sh.in        ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBuildRawEvents2022.sh)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startBmonMoni2022.sh.in              ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startBmonMoni2022.sh)
 
 set(INCLUDE_DIRECTORIES
     ${CMAKE_CURRENT_SOURCE_DIR}
@@ -273,6 +274,35 @@ set(DEPENDENCIES
 )
 GENERATE_EXECUTABLE()
 
+
+set(EXE_NAME BmonMonitor)
+set(SRCS CbmDeviceBmonMonitor.cxx runBmonMonitor.cxx)
+
+set(DEPENDENCIES
+  ${DEPENDENCIES}
+  ${FAIR_LIBS}
+  ParBase
+  ${BOOST_LIBS}
+  external::fles_ipc
+  CbmFlibFlesTools
+  CbmBase
+  CbmRecoBase
+  CbmMuchReco
+  CbmPsdReco
+  CbmRichReco
+  CbmRecoSts
+  CbmTofReco
+  CbmTrdReco
+  CbmData
+  CbmSimSteer # for CbmSetup!
+  Core
+  RIO
+  Net
+  Hist
+  RHTTP
+)
+GENERATE_EXECUTABLE()
+
 # Set the correct variables for the installation
 set(VMCWORKDIR ${CMAKE_INSTALL_PREFIX}/share/cbmroot)
 
@@ -288,6 +318,7 @@ configure_file(${MY_SOURCE_DIR}/startMQMcbmEvtBuilderWin2020.sh.in   ${TMPDIR}/b
 configure_file(${MY_SOURCE_DIR}/startMQBuildRawEvents.sh.in          ${TMPDIR}/bin/MQ/topologies/install/startMQBuildRawEvents.sh)
 configure_file(${MY_SOURCE_DIR}/startBuildRawEvents2021.sh.in        ${TMPDIR}/bin/MQ/topologies/install/startBuildRawEvents2021.sh)
 configure_file(${MY_SOURCE_DIR}/startBuildRawEventsCosmics2021.sh.in ${TMPDIR}/bin/MQ/topologies/install/startBuildRawEventsCosmics2021.sh)
+configure_file(${MY_SOURCE_DIR}/startBmonMoni2022.sh.in              ${TMPDIR}/bin/MQ/topologies/install/startBmonMoni2022.sh)
 
 install(PROGRAMS ${TMPDIR}/bin/MQ/topologies/install/startMQMcbmPulserMonitor2020.sh
                  ${TMPDIR}/bin/MQ/topologies/install/startMQMcbmEvtBuilderWin2020.sh
diff --git a/MQ/mcbm/CbmDeviceBmonMonitor.cxx b/MQ/mcbm/CbmDeviceBmonMonitor.cxx
new file mode 100644
index 0000000000..38dcf00145
--- /dev/null
+++ b/MQ/mcbm/CbmDeviceBmonMonitor.cxx
@@ -0,0 +1,605 @@
+/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt
+   SPDX-License-Identifier: GPL-3.0-only
+   Authors: Pierre-Alain Loizeau [committer] */
+
+/**
+ * CbmDeviceBmonMonitor.cxx
+ *
+ * @since 2022-05-23
+ * @author P.-A. Loizeau
+ */
+
+#include "CbmDeviceBmonMonitor.h"
+
+#include "CbmBmonUnpackConfig.h"
+#include "CbmFlesCanvasTools.h"
+#include "CbmMQDefs.h"
+#include "CbmMuchUnpackConfig.h"
+#include "CbmPsdUnpackConfig.h"
+#include "CbmRichUnpackConfig.h"
+#include "CbmSetup.h"
+#include "CbmStsUnpackConfig.h"
+#include "CbmTofUnpackConfig.h"
+#include "CbmTofUnpackMonitor.h"
+#include "CbmTrdUnpackConfig.h"
+#include "CbmTrdUnpackFaspConfig.h"
+
+#include "StorableTimeslice.hpp"
+#include "TimesliceMetaData.h"
+
+#include "FairMQLogger.h"
+#include "FairMQProgOptions.h"  // device->fConfig
+#include "FairParGenericSet.h"
+
+#include "TCanvas.h"
+#include "TFile.h"
+#include "TH1.h"
+#include "TList.h"
+#include "TNamed.h"
+
+#include "BoostSerializer.h"
+#include <boost/archive/binary_iarchive.hpp>
+#include <boost/serialization/utility.hpp>
+
+#include <array>
+#include <iomanip>
+#include <stdexcept>
+#include <string>
+#include <utility>
+
+#include "RootSerializer.h"
+struct InitTaskError : std::runtime_error {
+  using std::runtime_error::runtime_error;
+};
+
+using namespace std;
+
+//Bool_t bMcbm2018MonitorTaskT0ResetHistos = kFALSE;
+
+CbmDeviceBmonMonitor::CbmDeviceBmonMonitor() {}
+
+void CbmDeviceBmonMonitor::InitTask()
+try {
+  /// Read options from executable
+  LOG(info) << "Init options for CbmDeviceBmonMonitor.";
+  fsSetupName              = fConfig->GetValue<std::string>("Setup");
+  fuRunId                  = fConfig->GetValue<uint32_t>("RunId");
+  fbUnpBmon                = fConfig->GetValue<bool>("UnpBmon");
+  fbIgnoreOverlapMs        = fConfig->GetValue<bool>("IgnOverMs");
+  fbOutputFullTimeSorting  = fConfig->GetValue<bool>("FullTimeSort");
+  fvsSetTimeOffs           = fConfig->GetValue<std::vector<std::string>>("SetTimeOffs");
+  fsChannelNameDataInput   = fConfig->GetValue<std::string>("TsNameIn");
+  fsChannelNameDataOutput  = fConfig->GetValue<std::string>("TsNameOut");
+  fuPublishFreqTs          = fConfig->GetValue<uint32_t>("PubFreqTs");
+  fdMinPublishTime         = fConfig->GetValue<double_t>("PubTimeMin");
+  fdMaxPublishTime         = fConfig->GetValue<double_t>("PubTimeMax");
+  fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
+}
+catch (InitTaskError& e) {
+  LOG(error) << e.what();
+  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
+  cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
+}
+
+Bool_t CbmDeviceBmonMonitor::InitContainers()
+{
+  LOG(info) << "Init parameter containers for CbmDeviceBmonMonitor.";
+
+  // ----- FIXME: Environment settings? or binary option?
+  TString srcDir = std::getenv("VMCWORKDIR");  // top source directory, standard C++ library
+  //  TString srcDir = gSystem->Getenv("VMCWORKDIR");  // top source directory
+
+  // -----   CbmSetup   -----------------------------------------------------
+  // TODO: support for multiple setups on Par Server? with request containing setup name?
+  CbmSetup* cbmsetup = CbmSetup::Instance();
+  FairMQMessagePtr req(NewSimpleMessage("setup"));
+  FairMQMessagePtr rep(NewMessage());
+
+  if (Send(req, "parameters") > 0) {
+    if (Receive(rep, "parameters") >= 0) {
+      if (0 != rep->GetSize()) {
+        CbmSetupStorable* exchangableSetup;
+
+        CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
+        exchangableSetup = dynamic_cast<CbmSetupStorable*>(tmsg.ReadObject(tmsg.GetClass()));
+
+        if (nullptr != exchangableSetup) {
+          /// Prevent clang format single line if
+          cbmsetup->LoadStoredSetup(exchangableSetup);
+        }
+        else {
+          LOG(error) << "Received corrupt reply. Setup not available";
+          throw InitTaskError("Setup not received from par-server.");
+        }
+      }  // if( 0 !=  rep->GetSize() )
+      else {
+        LOG(error) << "Received empty reply. Setup not available";
+        throw InitTaskError("Setup not received from par-server.");
+      }  // else of if( 0 !=  rep->GetSize() )
+    }    // if( Receive( rep, "parameters" ) >= 0)
+  }      // if( Send(req, "parameters") > 0 )
+  // ------------------------------------------------------------------------
+
+  /// Initialize the UnpackerConfigs objects and their "user options"
+  // ---- BMON ----
+  std::shared_ptr<CbmBmonUnpackConfig> bmonconfig = nullptr;
+  if (fbUnpBmon) {
+    bmonconfig = std::make_shared<CbmBmonUnpackConfig>("", fuRunId);
+    if (bmonconfig) {
+      // bmonconfig->SetDebugState();
+      bmonconfig->SetDoWriteOutput();
+      // bmonconfig->SetDoWriteOptOutA("CbmBmonErrors");
+      std::string parfilesbasepathBmon = Form("%s/macro/beamtime/mcbm2022/", srcDir.Data());
+      bmonconfig->SetParFilesBasePath(parfilesbasepathBmon);
+      bmonconfig->SetParFileName("mBmonCriPar.par");
+      bmonconfig->SetSystemTimeOffset(-1220);  // [ns] value to be updated
+
+      /// Enable Monitor plots
+      auto monitor = std::make_shared<CbmTofUnpackMonitor>();
+      monitor->SetBmonMode(true);
+      monitor->SetInternalHttpMode(false);
+      if (2337 <= fuRunId) {
+        monitor->SetSpillThreshold(250);
+        monitor->SetSpillThresholdNonPulser(100);
+      }
+      bmonconfig->SetMonitor(monitor);
+    }
+  }
+  // -------------
+
+  /// Enable full time sorting instead of time sorting per FLIM link
+  if (bmonconfig) SetUnpackConfig(bmonconfig);
+
+  /// Load time offsets
+  for (std::vector<std::string>::iterator itStrOffs = fvsSetTimeOffs.begin(); itStrOffs != fvsSetTimeOffs.end();
+       ++itStrOffs) {
+    size_t charPosDel = (*itStrOffs).find(',');
+    if (std::string::npos == charPosDel) {
+      LOG(info) << "CbmDeviceBmonMonitor::InitContainers => "
+                << "Trying to set trigger window with invalid option pattern, ignored! "
+                << " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found " << (*itStrOffs) << " )";
+    }  // if( std::string::npos == charPosDel )
+
+    /// Detector Enum Tag
+    std::string sSelDet = (*itStrOffs).substr(0, charPosDel);
+    /// Min number
+    charPosDel++;
+    int32_t iOffset = std::stoi((*itStrOffs).substr(charPosDel));
+
+    if ("kT0" == sSelDet && fBmonConfig) {  //
+      fBmonConfig->SetSystemTimeOffset(iOffset);
+    }  // else if( "kT0" == sSelDet )
+    else {
+      LOG(info) << "CbmDeviceBmonMonitor::InitContainers => Trying to set time "
+                   "offset for unsupported detector, ignored! "
+                << (sSelDet);
+      continue;
+    }  // else of detector enum detection
+  }  // for( std::vector< std::string >::iterator itStrAdd = fvsAddDet.begin(); itStrAdd != fvsAddDet.end(); ++itStrAdd )
+
+  Bool_t initOK = kTRUE;
+  // --- Bmon
+  if (fBmonConfig) {
+    fBmonConfig->InitOutput();
+    //    RegisterOutputs(ioman, fBmonConfig);  /// Framework bound work = kept in this Task
+    fBmonConfig->SetAlgo();
+    fBmonConfig->LoadParFileName();  /// Needed to change the Parameter file name before it is used!!!
+    initOK &= InitParameters(fBmonConfig->GetParContainerRequest());  /// Framework bound work = kept in this Device
+    fBmonConfig->InitAlgo();
+    // initPerformanceMaps(fkFlesBmon, "Bmon");
+  }
+
+  /// Event header object
+  fCbmTsEventHeader = new CbmTsEventHeader();
+
+  return initOK;
+}
+
+Bool_t
+CbmDeviceBmonMonitor::InitParameters(std::vector<std::pair<std::string, std::shared_ptr<FairParGenericSet>>>* reqparvec)
+{
+  LOG(info) << "CbmDeviceBmonMonitor::InitParameters";
+  if (!reqparvec) {
+    LOG(info) << "CbmDeviceBmonMonitor::InitParameters - empty requirements vector no parameters initialized.";
+    return kTRUE;
+  }
+
+  // Now get the actual ascii files and init the containers with the asciiIo
+  for (auto& pair : *reqparvec) {
+    /*
+    auto filepath = pair.first;
+    auto parset   = pair.second;
+    FairParAsciiFileIo asciiInput;
+    if (!filepath.empty()) {
+      if (asciiInput.open(filepath.data())) { parset->init(&asciiInput); }
+    }
+    * */
+    std::string paramName {pair.second->GetName()};
+    // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
+    // Should only be used for small data because of the cost of an additional copy
+
+    // Here must come the proper Runid
+    std::string message = paramName + ",111";
+    LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
+
+    FairMQMessagePtr req(NewSimpleMessage(message));
+    FairMQMessagePtr rep(NewMessage());
+
+    FairParGenericSet* newObj = nullptr;
+
+    if (Send(req, "parameters") > 0) {
+      if (Receive(rep, "parameters") >= 0) {
+        if (0 != rep->GetSize()) {
+          CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
+          newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
+          LOG(info) << "Received unpack parameter from the server: " << newObj->GetName();
+          newObj->print();
+        }  // if( 0 !=  rep->GetSize() )
+        else {
+          LOG(error) << "Received empty reply. Parameter not available";
+          return kFALSE;
+        }                       // else of if( 0 !=  rep->GetSize() )
+      }                         // if( Receive( rep, "parameters" ) >= 0)
+    }                           // if( Send(req, "parameters") > 0 )
+    pair.second.reset(newObj);  /// Potentially unsafe reasignment of raw pointer to the shared pointer?
+    //delete newObj;
+  }
+  return kTRUE;
+}
+
+bool CbmDeviceBmonMonitor::InitHistograms()
+{
+  /// Histos creation and obtain pointer on them
+  /// Trigger histo creation on all associated algos
+  // ALGO: bool initOK = fMonitorAlgo->CreateHistograms();
+  bool initOK = true;
+
+  /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
+  // ALGO: std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
+  std::vector<std::pair<TNamed*, std::string>> vHistos = fBmonConfig->GetMonitor()->GetHistoVector();
+  /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
+  // ALGO: std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
+  std::vector<std::pair<TCanvas*, std::string>> vCanvases = fBmonConfig->GetMonitor()->GetCanvasVector();
+
+  /// Add pointers to each histo in the histo array
+  /// Create histo config vector
+  /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
+  ///      and send it through a separate channel using the BoostSerializer
+  for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
+    //         LOG(info) << "Registering  " << vHistos[ uHisto ].first->GetName()
+    //                   << " in " << vHistos[ uHisto ].second.data()
+    //                   ;
+    fArrayHisto.Add(vHistos[uHisto].first);
+    std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
+    fvpsHistosFolder.push_back(psHistoConfig);
+
+    LOG(info) << "Config of hist  " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
+  }  // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
+
+  /// Create canvas config vector
+  /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
+  ///      and send it through a separate channel using the BoostSerializer
+  for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
+    //         LOG(info) << "Registering  " << vCanvases[ uCanv ].first->GetName()
+    //                   << " in " << vCanvases[ uCanv ].second.data();
+    std::string sCanvName = (vCanvases[uCanv].first)->GetName();
+    std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
+
+    std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
+
+    fvpsCanvasConfig.push_back(psCanvConfig);
+
+    LOG(info) << "Config string of Canvas  " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
+  }  //  for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
+
+  return initOK;
+}
+
+// Method called by run loop and requesting new data from the TS source whenever
+bool CbmDeviceBmonMonitor::ConditionalRun()
+{
+  /// First do Algo related Initialization steps if needed
+  if (0 == fulNumMessages) {
+    try {
+      InitContainers();
+    }
+    catch (InitTaskError& e) {
+      LOG(error) << e.what();
+      ChangeState(fair::mq::Transition::ErrorFound);
+    }
+  }  // if( 0 == fulNumMessages)
+
+  if (0 == fulNumMessages) InitHistograms();
+
+  /// If first TS of this device, ask for the start time (lead to skip of 1 TS for 1st request)
+  if (!fbStartTimeSet) {
+    /// Request the start time
+    std::string message = "SendFirstTimesliceIndex";
+    LOG(debug) << "Requesting start time by sending message: SendFirstTimesliceIndex" << message;
+    FairMQMessagePtr req(NewSimpleMessage(message));
+    FairMQMessagePtr rep(NewMessage());
+
+    if (Send(req, fsChannelNameDataInput) <= 0) {
+      LOG(error) << "Failed to send the request! message was " << message;
+      return false;
+    }  // if (Send(req, fsChannelNameDataInput) <= 0)
+    else if (Receive(rep, fsChannelNameDataInput) < 0) {
+      LOG(error) << "Failed to receive a reply to the request! message was " << message;
+      return false;
+    }  // else if (Receive(rep, fsChannelNameDataInput) < 0)
+    else if (rep->GetSize() == 0) {
+      LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message;
+      return false;
+    }  // else if (rep->GetSize() == 0)
+    std::string sReply;
+    std::string msgStrRep(static_cast<char*>(rep->GetData()), rep->GetSize());
+    std::istringstream issRep(msgStrRep);
+    boost::archive::binary_iarchive inputArchiveRep(issRep);
+    inputArchiveRep >> sReply;
+
+    fBmonConfig->GetMonitor()->SetHistosStartTime((1e-9) * static_cast<double>(std::stoul(sReply)));
+    fbStartTimeSet = true;
+  }
+
+  /// First request a new TS (full one)
+  std::string message = "full";
+  LOG(debug) << "Requesting new TS by sending message: full" << message;
+  FairMQMessagePtr req(NewSimpleMessage(message));
+  FairMQMessagePtr rep(NewMessage());
+
+  if (Send(req, fsChannelNameDataInput) <= 0) {
+    LOG(error) << "Failed to send the request! message was " << message;
+    return false;
+  }  // if (Send(req, fsChannelNameDataInput) <= 0)
+  else if (Receive(rep, fsChannelNameDataInput) < 0) {
+    LOG(error) << "Failed to receive a reply to the request! message was " << message;
+    return false;
+  }  // else if (Receive(rep, fsChannelNameDataInput) < 0)
+  else if (rep->GetSize() == 0) {
+    LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message;
+    return false;
+  }  // else if (rep->GetSize() == 0)
+
+  fulNumMessages++;
+  LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize();
+
+  if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
+
+  std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize());
+  std::istringstream iss(msgStr);
+  boost::archive::binary_iarchive inputArchive(iss);
+
+  /// Create an empty TS and fill it with the incoming message
+  fles::StorableTimeslice ts {0};
+  inputArchive >> ts;
+
+  /// On first TS, extract the TS parameters from header (by definition stable over time)
+  if (-1.0 == fdTsCoreSizeInNs) {
+    fuNbCoreMsPerTs  = ts.num_core_microslices();
+    fuNbOverMsPerTs  = ts.num_microslices(0) - ts.num_core_microslices();
+    fdMsSizeInNs     = (ts.descriptor(0, fuNbCoreMsPerTs).idx - ts.descriptor(0, 0).idx) / fuNbCoreMsPerTs;
+    fdTsCoreSizeInNs = fdMsSizeInNs * (fuNbCoreMsPerTs);
+    fdTsOverSizeInNs = fdMsSizeInNs * (fuNbOverMsPerTs);
+    fdTsFullSizeInNs = fdTsCoreSizeInNs + fdTsOverSizeInNs;
+    LOG(info) << "Timeslice parameters: each TS has " << fuNbCoreMsPerTs << " Core MS and " << fuNbOverMsPerTs
+              << " Overlap MS, for a MS duration of " << fdMsSizeInNs << " ns, a core duration of " << fdTsCoreSizeInNs
+              << " ns and a full duration of " << fdTsFullSizeInNs << " ns";
+    fTsMetaData = new TimesliceMetaData(ts.descriptor(0, 0).idx, fdTsCoreSizeInNs, fdTsOverSizeInNs, ts.index());
+  }  // if( -1.0 == fdTsCoreSizeInNs )
+  else {
+    /// Update only the fields changing from TS to TS
+    fTsMetaData->SetStartTime(ts.descriptor(0, 0).idx);
+    fTsMetaData->SetIndex(ts.index());
+  }
+
+  /// Process the Timeslice
+  DoUnpack(ts, 0);
+
+  // Reset the event header for a new timeslice
+  fCbmTsEventHeader->Reset();
+
+  // Reset the unpackers for a new timeslice, e.g. clear the output vectors
+  // ---- Bmon ----
+  if (fBmonConfig) fBmonConfig->Reset();
+
+  /// Send histograms each 100 time slices. Should be each ~1s
+  /// Use also runtime checker to trigger sending after M s if
+  /// processing too slow or delay sending if processing too fast
+  std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
+  std::chrono::duration<double_t> elapsedSeconds    = currentTime - fLastPublishTime;
+  if ((fdMaxPublishTime < elapsedSeconds.count())
+      || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
+    if (!fbConfigSent) {
+      // Send the configuration only once per run!
+      fbConfigSent = SendHistoConfAndData();
+    }  // if( !fbConfigSent )
+    else
+      SendHistograms();
+
+    fLastPublishTime = std::chrono::system_clock::now();
+  }  // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
+
+  return true;
+}
+
+bool CbmDeviceBmonMonitor::SendUnpData()
+{
+  FairMQParts parts;
+
+  /// Prepare serialized versions of the TS Event header
+  FairMQMessagePtr messTsHeader(NewMessage());
+  //  Serialize<RootSerializer>(*messTsHeader, fCbmTsEventHeader);
+  RootSerializer().Serialize(*messTsHeader, fCbmTsEventHeader);
+
+  parts.AddPart(std::move(messTsHeader));
+
+  // ---- T0 ----
+  std::stringstream ossBmon;
+  boost::archive::binary_oarchive oaBmon(ossBmon);
+  if (fBmonConfig) {  //
+    oaBmon << *(fBmonConfig->GetOutputVec());
+  }
+  else {
+    oaBmon << (std::vector<CbmTofDigi>());
+  }
+  std::string* strMsgBmon = new std::string(ossBmon.str());
+
+  parts.AddPart(NewMessage(
+    const_cast<char*>(strMsgBmon->c_str()),  // data
+    strMsgBmon->length(),                    // size
+    [](void*, void* object) { delete static_cast<std::string*>(object); },
+    strMsgBmon));  // object that manages the data
+
+  /// Prepare serialized versions of the TS Meta
+  /// FIXME: only for TS duration and overlap, should be sent to parameter service instead as stable values in run
+  ///        Index and start time are already included in the TsHeader object!
+  FairMQMessagePtr messTsMeta(NewMessage());
+  //  Serialize<RootSerializer>(*messTsMeta, fTsMetaData);
+  RootSerializer().Serialize(*messTsMeta, fTsMetaData);
+  parts.AddPart(std::move(messTsMeta));
+
+  if (Send(parts, fsChannelNameDataOutput) < 0) {
+    LOG(error) << "Problem sending data to " << fsChannelNameDataOutput;
+    return false;
+  }
+
+  return true;
+}
+
+
+bool CbmDeviceBmonMonitor::SendHistoConfAndData()
+{
+  /// Prepare multiparts message and header
+  std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
+  FairMQMessagePtr messageHeader(NewMessage());
+  //  Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
+  BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
+  FairMQParts partsOut;
+  partsOut.AddPart(std::move(messageHeader));
+
+  for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
+    /// Serialize the vector of histo config into a single MQ message
+    FairMQMessagePtr messageHist(NewMessage());
+    //    Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
+    BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
+
+    partsOut.AddPart(std::move(messageHist));
+  }  // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
+
+  /// Catch case where no histos are registered!
+  /// => Add empty message
+  if (0 == fvpsHistosFolder.size()) {
+    FairMQMessagePtr messageHist(NewMessage());
+    partsOut.AddPart(std::move(messageHist));
+  }
+
+  for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
+    /// Serialize the vector of canvas config into a single MQ message
+    FairMQMessagePtr messageCan(NewMessage());
+    //    Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
+    BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
+
+    partsOut.AddPart(std::move(messageCan));
+  }  // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
+
+  /// Catch case where no Canvases are registered!
+  /// => Add empty message
+  if (0 == fvpsCanvasConfig.size()) {
+    FairMQMessagePtr messageHist(NewMessage());
+    partsOut.AddPart(std::move(messageHist));
+  }
+
+  /// Serialize the array of histos into a single MQ message
+  FairMQMessagePtr msgHistos(NewMessage());
+  //  Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
+  RootSerializer().Serialize(*msgHistos, &fArrayHisto);
+  partsOut.AddPart(std::move(msgHistos));
+
+  /// Send the multi-parts message to the common histogram messages queue
+  if (Send(partsOut, fsChannelNameHistosInput) < 0) {
+    LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data";
+    return false;
+  }  // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
+
+  /// Reset the histograms after sending them (but do not reset the time)
+  // ALGO: fMonitorAlgo->ResetHistograms(kFALSE);
+  fBmonConfig->GetMonitor()->ResetHistograms();
+  fBmonConfig->GetMonitor()->ResetBmonHistograms(kFALSE);
+
+  return true;
+}
+
+bool CbmDeviceBmonMonitor::SendHistograms()
+{
+  /// Serialize the array of histos into a single MQ message
+  FairMQMessagePtr message(NewMessage());
+  //  Serialize<RootSerializer>(*message, &fArrayHisto);
+  RootSerializer().Serialize(*message, &fArrayHisto);
+
+  /// Send message to the common histogram messages queue
+  if (Send(message, fsChannelNameHistosInput) < 0) {
+    LOG(error) << "Problem sending data";
+    return false;
+  }  // if( Send( message, fsChannelNameHistosInput ) < 0 )
+
+  /// Reset the histograms after sending them (but do not reset the time)
+  // ALGO: fMonitorAlgo->ResetHistograms(kFALSE);
+
+  return true;
+}
+
+
+CbmDeviceBmonMonitor::~CbmDeviceBmonMonitor()
+{
+  if (fBmonConfig) fBmonConfig->GetUnpacker()->Finish();
+}
+
+Bool_t CbmDeviceBmonMonitor::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
+{
+  fulTsCounter++;
+  // Prepare timeslice
+  //  const fles::Timeslice& timeslice = *ts;
+
+  fCbmTsEventHeader->SetTsIndex(ts.index());
+  fCbmTsEventHeader->SetTsStartTime(ts.start_time());
+
+  uint64_t nComponents = ts.num_components();
+  // if (fDoDebugPrints) LOG(info) << "Unpack: TS index " << ts.index() << " components " << nComponents;
+  LOG(debug) << "Unpack: TS index " << ts.index() << " components " << nComponents;
+
+  for (uint64_t component = 0; component < nComponents; component++) {
+    auto systemId = static_cast<std::uint16_t>(ts.descriptor(component, 0).sys_id);
+
+    switch (systemId) {
+      case fkFlesBmon: {
+        if (fBmonConfig) {
+          fCbmTsEventHeader->AddNDigisBmon(
+            unpack(systemId, &ts, component, fBmonConfig, fBmonConfig->GetOptOutAVec(), fBmonConfig->GetOptOutBVec()));
+        }
+        break;
+      }
+      default: {
+        if (fDoDebugPrints) LOG(error) << "Unpack: Unknown system ID " << systemId << " for component " << component;
+        break;
+      }
+    }
+  }
+
+  if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
+
+  return kTRUE;
+}
+/**
+ * @brief Get the Trd Spadic
+ * @return std::shared_ptr<CbmTrdSpadic>
+*/
+std::shared_ptr<CbmTrdSpadic> CbmDeviceBmonMonitor::GetTrdSpadic(bool useAvgBaseline)
+{
+  auto spadic = std::make_shared<CbmTrdSpadic>();
+  spadic->SetUseBaselineAverage(useAvgBaseline);
+  spadic->SetMaxAdcToEnergyCal(1.0);
+
+  return spadic;
+}
+
+void CbmDeviceBmonMonitor::Finish() {}
diff --git a/MQ/mcbm/CbmDeviceBmonMonitor.h b/MQ/mcbm/CbmDeviceBmonMonitor.h
new file mode 100644
index 0000000000..fc607dc8cb
--- /dev/null
+++ b/MQ/mcbm/CbmDeviceBmonMonitor.h
@@ -0,0 +1,256 @@
+/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt
+   SPDX-License-Identifier: GPL-3.0-only
+   Authors: Pierre-Alain Loizeau [committer] */
+
+/**
+ * CbmDeviceBmonMonitor.h
+ *
+ * @since 2022-05-23
+ * @author P.-A. Loizeau
+ */
+
+#ifndef CBMDEVICEBMONMONI_H_
+#define CBMDEVICEBMONMONI_H_
+
+#include "CbmMqTMessage.h"
+#include "CbmTsEventHeader.h"
+
+#include "Timeslice.hpp"
+
+#include "FairMQDevice.h"
+#include "FairParGenericSet.h"
+
+#include "Rtypes.h"
+#include "TObjArray.h"
+
+#include <chrono>
+#include <map>
+#include <vector>
+
+class TList;
+class CbmBmonUnpackConfig;
+
+class TimesliceMetaData;
+
+class CbmTrdSpadic;
+
+class CbmDeviceBmonMonitor : public FairMQDevice {
+public:
+  CbmDeviceBmonMonitor();
+  virtual ~CbmDeviceBmonMonitor();
+
+protected:
+  virtual void InitTask();
+  bool ConditionalRun();
+  bool HandleCommand(FairMQMessagePtr&, int);
+
+  /** @brief Set the Bmon Unpack Config @param config */
+  void SetUnpackConfig(std::shared_ptr<CbmBmonUnpackConfig> config) { fBmonConfig = config; }
+
+private:
+  /// Constants
+  static constexpr std::uint16_t fkFlesBmon = static_cast<std::uint16_t>(fles::SubsystemIdentifier::T0);
+
+
+  /// Control flags
+  Bool_t fbIgnoreOverlapMs       = false;  //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
+  Bool_t fbComponentsAddedToList = kFALSE;
+  bool fbStartTimeSet            = false;
+
+  /** @brief Flag if extended debug output is to be printed or not*/
+  bool fDoDebugPrints = false;  //!
+  /** @brief Flag if performance profiling should be activated or not.*/
+  bool fDoPerfProf = false;  //!
+  /** @brief Flag to Enable/disable a full time sorting. If off, time sorting happens per link/FLIM source */
+  bool fbOutputFullTimeSorting = false;
+
+  /// User settings parameters
+  std::string fsSetupName = "mcbm_beam_2021_07_surveyed";
+  uint32_t fuRunId        = 1588;
+  /// ---> for selective unpacking
+  bool fbUnpBmon = true;
+  /// message queues
+  std::string fsChannelNameDataInput   = "ts-request";
+  std::string fsChannelNameDataOutput  = "unpts_0";
+  std::string fsChannelNameCommands    = "commands";
+  std::string fsChannelNameHistosInput = "histogram-in";
+  /// Histograms management
+  uint32_t fuPublishFreqTs  = 100;
+  double_t fdMinPublishTime = 0.5;
+  double_t fdMaxPublishTime = 5.0;
+
+  /// Parameters management
+  //      TList* fParCList = nullptr;
+  Bool_t InitParameters(std::vector<std::pair<std::string, std::shared_ptr<FairParGenericSet>>>* reqparvec);
+
+  /// Statistics & first TS rejection
+  uint64_t fulNumMessages                                = 0;
+  uint64_t fulTsCounter                                  = 0;
+  std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now();
+  /** @brief Map to store a name for the unpackers and the processed amount of digis, key = fkFlesId*/
+  std::map<std::uint16_t, std::pair<std::string, size_t>> fNameMap = {};  //!
+  /** @brief Map to store the cpu and wall time, key = fkFlesId*/
+  std::map<std::uint16_t, std::pair<double, double>> fTimeMap = {};  //!
+  /** @brief Map to store the in and out data amount, key = fkFlesId*/
+  std::map<std::uint16_t, std::pair<double, double>> fDataSizeMap = {};  //!
+
+  /// Configuration of the unpackers. Provides the configured algorithm
+  std::shared_ptr<CbmBmonUnpackConfig> fBmonConfig = nullptr;
+
+  /// Pointer to the Timeslice header conatining start time and index
+  CbmTsEventHeader* fCbmTsEventHeader = nullptr;
+
+  /// Time offsets
+  std::vector<std::string> fvsSetTimeOffs = {};
+
+  /// TS MetaData storage: stable so should be moved somehow to parameters handling (not transmitted with each TS
+  size_t fuNbCoreMsPerTs    = 0;     //!
+  size_t fuNbOverMsPerTs    = 0;     //!
+  Double_t fdMsSizeInNs     = 0;     //! Size of a single MS, [nanoseconds]
+  Double_t fdTsCoreSizeInNs = -1.0;  //! Total size of the core MS in a TS, [nanoseconds]
+  Double_t fdTsOverSizeInNs = -1.0;  //! Total size of the overlap MS in a TS, [nanoseconds]
+  Double_t fdTsFullSizeInNs = -1.0;  //! Total size of all MS in a TS, [nanoseconds]
+  TimesliceMetaData* fTsMetaData;
+
+  /// Array of histograms to send to the histogram server
+  TObjArray fArrayHisto = {};
+  /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server
+  std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {};
+  /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
+  /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
+  /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
+  std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {};
+  /// Flag indicating whether the histograms and canvases configurations were already published
+  bool fbConfigSent = false;
+
+  Bool_t InitContainers();
+  bool InitHistograms();
+  Bool_t DoUnpack(const fles::Timeslice& ts, size_t component);
+  void Finish();
+  bool SendUnpData();
+  bool SendHistoConfAndData();
+  bool SendHistograms();
+
+  std::shared_ptr<CbmTrdSpadic> GetTrdSpadic(bool useAvgBaseline);
+
+  /** @brief Sort a vector timewise vector type has to provide GetTime() */
+  template<typename TVecobj>
+  typename std::enable_if<std::is_same<TVecobj, std::nullptr_t>::value == true, void>::type
+  timesort(std::vector<TVecobj>* /*vec = nullptr*/)
+  {
+    LOG(debug)
+      << "CbmDeviceBmonMonitor::timesort() got an object that has no member function GetTime(). Hence, we can and "
+         "will not timesort it!";
+  }
+
+  template<typename TVecobj>
+  typename std::enable_if<!std::is_member_function_pointer<decltype(&TVecobj::GetTime)>::value, void>::type
+  timesort(std::vector<TVecobj>* /*vec = nullptr*/)
+  {
+    LOG(debug) << "CbmDeviceBmonMonitor::timesort() " << TVecobj::Class_Name()
+               << "is  an object that has no member function GetTime(). Hence, we can and "
+                  "will not timesort it!";
+  }
+
+  template<typename TVecobj>
+  typename std::enable_if<std::is_member_function_pointer<decltype(&TVecobj::GetTime)>::value, void>::type
+  timesort(std::vector<TVecobj>* vec = nullptr)
+  {
+    if (vec == nullptr) return;
+    std::sort(vec->begin(), vec->end(),
+              [](const TVecobj& a, const TVecobj& b) -> bool { return a.GetTime() < b.GetTime(); });
+  }
+
+  /**
+   * @brief Template for the unpacking call of a given algorithm.
+   *
+   * @tparam TAlgo Algorithm to be called
+   * @tparam TOutput Output element types
+   * @tparam TOptoutputs Optional output element types
+   * @param ts Timeslice
+   * @param icomp Component number
+   * @param algo Algorithm to be used for this component
+   * @param outtargetvec Target vector for the output elements
+   * @param optoutputvecs Target vectors for optional outputs
+   * @return std::pair<ndigis, std::pair<cputime, walltime>>
+  */
+  template<class TConfig, class TOptOutA = std::nullptr_t, class TOptOutB = std::nullptr_t>
+  size_t unpack(const std::uint16_t subsysid, const fles::Timeslice* ts, std::uint16_t icomp, TConfig config,
+                std::vector<TOptOutA>* optouttargetvecA = nullptr, std::vector<TOptOutB>* optouttargetvecB = nullptr)
+  {
+
+    auto wallstarttime        = std::chrono::high_resolution_clock::now();
+    std::clock_t cpustarttime = std::clock();
+
+    auto algo                        = config->GetUnpacker();
+    std::vector<TOptOutA> optoutAvec = {};
+    std::vector<TOptOutB> optoutBvec = {};
+    if (optouttargetvecA) { algo->SetOptOutAVec(&optoutAvec); }
+    if (optouttargetvecB) { algo->SetOptOutBVec(&optoutBvec); }
+
+    // Set the start time of the current TS for this algorithm
+    algo->SetTsStartTime(ts->start_time());
+
+    // Run the actual unpacking
+    auto digivec = algo->Unpack(ts, icomp);
+
+    // Check if we want to write the output to somewhere (in pure online monitoring mode for example this can/would/should be skipped)
+    if (config->GetOutputVec()) {
+      // Lets do some time-sorting if we are not doing it later
+      if (!fbOutputFullTimeSorting) timesort(&digivec);
+
+      // Transfer the data from the timeslice vector to the target branch vector
+      // Digis/default output retrieved as offered by the algorithm
+      for (auto digi : digivec)
+        config->GetOutputVec()->emplace_back(digi);
+    }
+    if (optouttargetvecA) {
+      // Lets do some timesorting
+      if (!fbOutputFullTimeSorting) timesort(&optoutAvec);
+      // Transfer the data from the timeslice vector to the target branch vector
+      for (auto optoutA : optoutAvec)
+        optouttargetvecA->emplace_back(optoutA);
+    }
+    if (optouttargetvecB) {
+      // Second opt output is not time sorted to allow non GetTime data container.
+      // Lets do some timesorting
+      timesort(&optoutAvec);
+      // Transfer the data from the timeslice vector to the target branch vector
+      for (auto optoutB : optoutBvec)
+        optouttargetvecB->emplace_back(optoutB);
+    }
+
+    std::clock_t cpuendtime = std::clock();
+    auto wallendtime        = std::chrono::high_resolution_clock::now();
+
+    // Cpu time in [µs]
+    auto cputime = 1e6 * (cpuendtime - cpustarttime) / CLOCKS_PER_SEC;
+    algo->AddCpuTime(cputime);
+    // Real time in [µs]
+    auto walltime = std::chrono::duration<double, std::micro>(wallendtime - wallstarttime).count();
+    algo->AddWallTime(walltime);
+
+
+    // Check some numbers from this timeslice
+    size_t nDigis = digivec.size();
+    LOG(debug) << "Component " << icomp << " connected to config " << config->GetName() << "   n-Digis " << nDigis
+               << " processed in walltime(cputime) = " << walltime << "(" << cputime << cputime << ") µs"
+               << "this timeslice.";
+
+    if (fDoPerfProf) {
+      auto timeit = fTimeMap.find(subsysid);
+      timeit->second.first += cputime;
+      timeit->second.second += walltime;
+
+      auto datait = fDataSizeMap.find(subsysid);
+      datait->second.first += ts->size_component(icomp) / 1.0e6;
+      datait->second.second += nDigis * algo->GetOutputObjSize() / 1.0e6;
+
+      fNameMap.find(subsysid)->second.second += nDigis;
+    }
+
+    return nDigis;
+  }
+};
+
+#endif /* CBMDEVICEMCBMUNPACK_H_ */
diff --git a/MQ/mcbm/runBmonMonitor.cxx b/MQ/mcbm/runBmonMonitor.cxx
new file mode 100644
index 0000000000..43177e5238
--- /dev/null
+++ b/MQ/mcbm/runBmonMonitor.cxx
@@ -0,0 +1,41 @@
+/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt
+   SPDX-License-Identifier: GPL-3.0-only
+   Authors: Pierre-Alain Loizeau [committer] */
+
+#include "CbmDeviceBmonMonitor.h"
+
+#include <iomanip>
+#include <string>
+
+#include "runFairMQDevice.h"
+
+namespace bpo = boost::program_options;
+using namespace std;
+
+void addCustomOptions(bpo::options_description& options)
+{
+  options.add_options()("Setup", bpo::value<std::string>()->default_value("mcbm_beam_2021_07_surveyed"),
+                        "Name/tag of the geomatry setup");
+  options.add_options()("RunId", bpo::value<uint32_t>()->default_value(1588), "Run ID");
+  options.add_options()("UnpBmon", bpo::value<bool>()->default_value(false), "Enable Bmon unpacking if true");
+  options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
+  options.add_options()("FullTimeSort", bpo::value<bool>()->default_value(true),
+                        "Full time sorting per detector before sending output array");
+  options.add_options()("SetTimeOffs", bpo::value<std::vector<std::string>>()->multitoken()->composing(),
+                        "Set time offset in ns for selected detector, use string matching "
+                        "ECbmModuleId,dOffs e.g. kTof,-35.2");
+  options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"),
+                        "MQ channel name for raw TS data");
+  options.add_options()("TsNameOut", bpo::value<std::string>()->default_value("unpts_0"),
+                        "MQ channel name for unpacked TS data");
+
+  options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(0), "Histo publishing frequency in TS");
+  options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
+                        "Minimal time between two publishing");
+  options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
+                        "Maximal time between two publishing");
+  options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
+                        "MQ channel name for histos");
+}
+
+FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceBmonMonitor(); }
diff --git a/MQ/mcbm/startBmonMoni2022.sh.in b/MQ/mcbm/startBmonMoni2022.sh.in
new file mode 100755
index 0000000000..10c9f25dbd
--- /dev/null
+++ b/MQ/mcbm/startBmonMoni2022.sh.in
@@ -0,0 +1,235 @@
+#!/bin/bash
+
+if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
+  @SIMPATH@/bin/fairmq-shmmonitor --cleanup
+fi
+
+if [ $# -ge 2 ]; then
+  _nbbranch=$1
+  _run_id=$2
+  ((_pubfreqts = $_nbbranch*100 ))
+  _pubminsec=1.0
+  _pubmaxsec=10.0
+
+  if [ $# -ge 5 ]; then
+    _filename=""
+    _dirname=""
+    _hostname=$5
+
+    if [ $# -ge 6 ]; then
+      _pubfreqts=$6
+
+      if [ $# -ge 7 ]; then
+        _pubminsec=$7
+
+        if [ $# -ge 8 ]; then
+          _pubmaxsec=$8
+        fi
+      fi
+    fi
+  elif [ $# -ge 3 ]; then
+    _filename=$3
+    _hostname=""
+    if [ $# -eq 4 ]; then
+      _dirname=$4
+    else
+      _dirname=""
+    fi
+  else
+    echo 'Missing parameters or wrong number of parameters.'
+    echo 'Possible usages are:'
+    echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> <full filename pattern list>'
+    echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> <filename pattern> <folder_path>'
+    echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list>'
+    echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS>'
+    echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
+    echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
+
+    return -1
+  fi
+else
+  echo 'Missing parameters. At least the number of branches and the trigger set are required'
+  echo 'Possible usages are:'
+  echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> <full filename pattern list>'
+  echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> <filename pattern> <folder_path>'
+  echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list>'
+  echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS>'
+  echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
+  echo 'startBmonMoni2022.sh <Nb // branches> <Run Id> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
+
+  return -1
+fi
+
+_parfileSts=@VMCWORKDIR@/macro/beamtime/mcbm2022/mStsPar.par
+_parfileMuch=@VMCWORKDIR@/macro/beamtime/mcbm2022/mMuchPar.par  # Valid from 2163
+_parfileTrdAsic=@VMCWORKDIR@/parameters/trd/trd_v22d_mcbm.asic.par
+_parfileTrdDigi=@VMCWORKDIR@/parameters/trd/trd_v22d_mcbm.digi.par
+_parfileTrdGas=@VMCWORKDIR@/parameters/trd/trd_v22d_mcbm.gas.par
+_parfileTrdGain=@VMCWORKDIR@/parameters/trd/trd_v22d_mcbm.gain.par
+_parfileTof=@VMCWORKDIR@/macro/beamtime/mcbm2022/mTofCriPar.par
+_parfileBmon=@VMCWORKDIR@/macro/beamtime/mcbm2022/mBmonCriPar.par
+_parfileRich=@VMCWORKDIR@/macro/beamtime/mcbm2021/mRichPar_70.par
+_parfilePsd=@VMCWORKDIR@/macro/beamtime/mcbm2021/mPsdPar.par
+_setup_name=mcbm_beam_2022_03_22_iron
+
+if [ $_run_id -ge 2060 ]; then
+  if [ $_run_id -le 2065 ]; then
+    _setup_name=mcbm_beam_2022_03_09_carbon
+    _parfileMuch=@VMCWORKDIR@/macro/beamtime/mcbm2022/mMuchParUpto26032022.par # Valid for runs 2060-2162
+  elif [ $_run_id -le 2160 ]; then # Potentially wrong setup between 2065 and 2150 but not official runs
+    _setup_name=mcbm_beam_2022_03_22_iron
+    _parfileMuch=@VMCWORKDIR@/macro/beamtime/mcbm2022/mMuchParUpto26032022.par # Valid for runs 2060-2162
+  elif [ $_run_id -le 2310 ]; then # Potentially wrong setup between 2160 and 2176 but not official runs
+    _setup_name=mcbm_beam_2022_03_28_uranium
+  fi
+fi
+
+
+_ratelog=0 # hides ZMQ messages rates and bandwidth
+#_ratelog=1 # display ZMQ messages rates and bandwidth
+
+LOGFILETAG=`hostname`
+LOGFILETAG+="_"
+LOGFILETAG+=`date +%Y_%m_%d_%H_%M_%S`
+LOGFILETAG+=".log"
+
+# Compute limits of TOF selection/trigger window
+_TofL=$_TofMean
+_TofH=$_TofMean
+(( _TofL -= _TofWin))
+(( _TofH += _TofWin))
+echo Tof window $_TofL - $_TofH
+
+LIST_OF_PIDS=""
+
+(( _paraBuffSz=100 ))
+(( _singBuffSz=_paraBuffSz*_nbbranch ))
+
+echo "Buffer size for parallel  devices $_paraBuffSz"
+echo "Buffer size for singleton devices $_singBuffSz"
+
+
+SAMPLER="RepReqTsSampler"
+SAMPLER+=" --control static"
+SAMPLER+=" --id sampler1"
+#SAMPLER+=" --max-timeslices 0"
+#SAMPLER+=" --max-timeslices 10"
+#SAMPLER+=" --max-timeslices 30"
+#SAMPLER+=" --max-timeslices 100"
+#SAMPLER+=" --max-timeslices 300"
+#SAMPLER+=" --max-timeslices 1000"
+SAMPLER+=" --max-timeslices -1"
+#SAMPLER+=" --severity info"
+#SAMPLER+=" --flib-port 10"
+if [ "$_hostname" != "" ]; then
+  SAMPLER+=" --fles-host $_hostname"
+elif [ "$_filename" != "" ]; then
+  SAMPLER+=" --filename $_filename"
+  if [ "$_dirname" != "" ]; then
+    SAMPLER+=" --dirname $_dirname"
+  fi
+fi
+SAMPLER+=" --high-water-mark 10"
+SAMPLER+=" --no-split-ts 1"
+SAMPLER+=" --ChNameMissTs missedts"
+SAMPLER+=" --ChNameCmds commands"
+SAMPLER+=" --PubFreqTs $_pubfreqts"
+SAMPLER+=" --PubTimeMin $_pubminsec"
+SAMPLER+=" --PubTimeMax $_pubmaxsec"
+SAMPLER+=" --channel-config name=ts-request,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11555,rateLogging=$_ratelog"
+SAMPLER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666,rateLogging=$_ratelog"
+SAMPLER+=" --channel-config name=missedts,type=pub,method=bind,address=tcp://127.0.0.1:11006,rateLogging=$_ratelog"
+SAMPLER+=" --channel-config name=commands,type=pub,method=bind,address=tcp://127.0.0.1:11007,rateLogging=$_ratelog"
+SAMPLER+=" --transport zeromq"
+# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
+# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
+SAMPLER_LOG="sampler1_$LOGFILETAG"
+# xterm -l -lf $SAMPLER_LOG -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER  &
+nohup @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER  &> $SAMPLER_LOG &
+
+echo $SAMPLER
+
+LIST_OF_PIDS+=$!
+LIST_OF_PIDS+=" "
+
+PARAMETERSERVER="parmq-server"
+PARAMETERSERVER+=" --control static"
+PARAMETERSERVER+=" --id parmq-server"
+PARAMETERSERVER+=" --severity info"
+PARAMETERSERVER+=" --channel-name parameters"
+PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
+PARAMETERSERVER+=" --first-input-name $_parfileSts;$_parfileMuch;$_parfileTrdAsic;$_parfileTrdDigi;$_parfileTrdGas;$_parfileTrdGain;$_parfileTof;$_parfileBmon;$_parfileRich;$_parfilePsd"
+PARAMETERSERVER+=" --first-input-type ASCII"
+#PARAMETERSERVER+=" --libs-to-load=CbmStsBase;CbmMuchBase;CbmTrdBase;CbmTofBase;CbmRichBase" # doesn't work due to runtime problem
+PARAMETERSERVER+=" --setup $_setup_name"
+# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
+# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
+PARAMSRV_LOG="parmq_$LOGFILETAG"
+# xterm -l -lf $PARAMSRV_LOG -geometry 80x23+1600+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &
+nohup @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &> $PARAMSRV_LOG &
+
+LIST_OF_PIDS+=$!
+LIST_OF_PIDS+=" "
+
+HISTSERVER="MqHistoServer"
+HISTSERVER+=" --control static"
+HISTSERVER+=" --id server1"
+HISTSERVER+=" --severity info"
+HISTSERVER+=" --histport 8091"
+HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666,rateLogging=$_ratelog"
+HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667,rateLogging=0"
+HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668,rateLogging=0"
+# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
+# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
+HISTSRV_LOG="server1_$LOGFILETAG"
+# xterm -l -lf $HISTSRV_LOG -geometry 80x23+2000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &
+nohup @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &> $HISTSRV_LOG &
+
+LIST_OF_PIDS+=$!
+LIST_OF_PIDS+=" "
+
+sleep 1
+
+_iBranch=0
+while (( _iBranch < _nbbranch )); do
+  (( _yOffset=200*_iBranch ))
+  (( _iBranch += 1 ))
+  (( _iPort = 11680 + _iBranch ))
+
+  UNPACKER="BmonMonitor"
+  UNPACKER+=" --control static"
+  UNPACKER+=" --id unp$_iBranch"
+  #UNPACKER+=" --severity error"
+  UNPACKER+=" --severity info"
+  #UNPACKER+=" --severity debug"
+  UNPACKER+=" --Setup $_setup_name"
+  UNPACKER+=" --RunId $_run_id"
+  UNPACKER+=" --IgnOverMs 1"
+  UNPACKER+=" --UnpBmon true"
+  UNPACKER+=" --SetTimeOffs kT0,0"
+  UNPACKER+=" --PubFreqTs $_pubfreqts"
+  UNPACKER+=" --PubTimeMin $_pubminsec"
+  UNPACKER+=" --PubTimeMax $_pubmaxsec"
+  UNPACKER+=" --TsNameOut unpts$_iBranch"
+  UNPACKER+=" --channel-config name=ts-request,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11555,rateLogging=$_ratelog"
+  UNPACKER+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
+  UNPACKER+=" --channel-config name=unpts$_iBranch,type=push,method=bind,transport=zeromq,sndBufSize=2,address=tcp://127.0.0.1:$_iPort,rateLogging=$_ratelog"
+#  UNPACKER+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007"
+  UNPACKER+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666,rateLogging=$_ratelog"
+  UNPACKER+=" --transport zeromq"
+  # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
+  # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
+  UNPACKER_LOG="unp$_iBranch"
+  UNPACKER_LOG+="_$LOGFILETAG"
+  # xterm -l -lf $UNPACKER_LOG -geometry 132x23+400+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/mcbm/$UNPACKER &
+  nohup @CMAKE_BINARY_DIR@/bin/MQ/mcbm/$UNPACKER &> $UNPACKER_LOG &
+
+  LIST_OF_PIDS+=$!
+  LIST_OF_PIDS+=" "
+done
+
+LIST_OF_PIDS+=$!
+LIST_OF_PIDS+=" "
+
+PID_LOG="pids_$LOGFILETAG"
+echo $LIST_OF_PIDS &> $PID_LOG
diff --git a/MQ/source/CbmMQTsSamplerRepReq.cxx b/MQ/source/CbmMQTsSamplerRepReq.cxx
index 61decdbeb4..5872da9caa 100644
--- a/MQ/source/CbmMQTsSamplerRepReq.cxx
+++ b/MQ/source/CbmMQTsSamplerRepReq.cxx
@@ -359,6 +359,18 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int)
     return true;
   }
 
+  /// TODO: add support for alternative request with "system name" instead of "system ID"
+  std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize());
+  if ("SendFirstTimesliceIndex" == reqStr) {
+    if (0 == fulFirstTsIndex) {  //
+      GetNewTs();
+    }
+    if (!SendFirstTsIndex() && !fbEofFound) {  //
+      return false;
+    }
+    return true;
+  }
+
   if (fbNoSplitTs) {
 
     if (!CreateAndSendFullTs() && !fbEofFound) {
@@ -374,7 +386,6 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int)
   }    // if( fbNoSplitTs )
   else if (fbSendTsPerSysId) {
     /// TODO: add support for alternative request with "system name" instead of "system ID"
-    std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize());
     int iSysId = std::stoi(reqStr);
     LOG(debug) << "Received TS SysId component request from client: 0x" << std::hex << iSysId << std::dec;
 
@@ -392,7 +403,6 @@ bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int)
     }  // if(!CreateAndCombineComponentsPerSysId(iSysId) && !fbEofFound)
   }    // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs
   else if (fbSendTsPerBlock) {
-    std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize());
     LOG(debug) << "Received TS components block request from client: " << reqStr;
 
     /// This assumes that the order of the components does NOT change after the first TS
@@ -443,9 +453,15 @@ std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs()
       const fles::Timeslice& ts = *timeslice;
       uint64_t uTsIndex         = ts.index();
 
+      if (0 == fulFirstTsIndex) {  //
+        fulFirstTsIndex = ts.descriptor(0, 0).idx;
+      }
+
       if (0 < fuPublishFreqTs) {
         uint64_t uTsTime = ts.descriptor(0, 0).idx;
-        if (0 == fuStartTime) { fuStartTime = uTsTime; }  // if( 0 == fuStartTime )
+        if (0 == fuStartTime) {  //
+          fuStartTime = uTsTime;
+        }  // if( 0 == fuStartTime )
         fdTimeToStart    = static_cast<double_t>(uTsTime - fuStartTime) / 1e9;
         uint64_t uSizeMb = 0;
 
@@ -819,6 +835,36 @@ bool CbmMQTsSamplerRepReq::CreateCombinedComponentsPerBlock(std::string sBlockNa
   return false;
 }
 
+bool CbmMQTsSamplerRepReq::SendFirstTsIndex()
+{
+  // create the message with the first timeslice index
+  std::string sIndex = FormatDecPrintout(fulFirstTsIndex);
+  // serialize the vector and create the message
+  std::stringstream oss;
+  boost::archive::binary_oarchive oa(oss);
+  oa << sIndex;
+  std::string* strMsg = new std::string(oss.str());
+
+  FairMQMessagePtr msg(NewMessage(
+    const_cast<char*>(strMsg->c_str()),  // data
+    strMsg->length(),                    // size
+    [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
+    strMsg));  // object that manages the data
+
+  // in case of error or transfer interruption,
+  // return false to go to IDLE state
+  // successfull transfer will return number of bytes
+  // transfered (can be 0 if sending an empty message).
+  if (Send(msg, fsChannelNameTsRequest) < 0) {
+    LOG(error) << "Problem sending reply with first TS index";
+    return false;
+  }
+
+  fulMessageCounter++;
+  LOG(debug) << "Send message " << fulMessageCounter << " with a size of " << msg->GetSize();
+
+  return true;
+}
 bool CbmMQTsSamplerRepReq::SendData(const fles::StorableTimeslice& component)
 {
   // serialize the timeslice and create the message
diff --git a/MQ/source/CbmMQTsSamplerRepReq.h b/MQ/source/CbmMQTsSamplerRepReq.h
index a5d186f741..b12b6a268c 100644
--- a/MQ/source/CbmMQTsSamplerRepReq.h
+++ b/MQ/source/CbmMQTsSamplerRepReq.h
@@ -59,6 +59,7 @@ protected:
   double_t fdMinPublishTime            = 0.5;
   double_t fdMaxPublishTime            = 5;
 
+  uint64_t fulFirstTsIndex   = 0;
   uint64_t fulPrevTsIndex    = 0;
   uint64_t fulTsCounter      = 0;
   uint64_t fulMessageCounter = 0;
@@ -80,6 +81,7 @@ private:
   bool PrepareCompListPerBlock();
   bool CreateCombinedComponentsPerBlock(std::string sBlockName);
 
+  bool SendFirstTsIndex();
   bool SendData(const fles::StorableTimeslice& component);
   bool SendMissedTsIdx(std::vector<uint64_t> vIndices);
   bool SendCommand(std::string sCommand);
-- 
GitLab